This is an automated email from the ASF dual-hosted git repository.
wutao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new c212da6 refactor: use rocksdb_wrapper::get to reimplement incr (#651)
c212da6 is described below
commit c212da634c177c7cfd3ee64bea4c2f1658c05342
Author: zhao liwei <[email protected]>
AuthorDate: Thu Dec 10 08:45:10 2020 +0800
refactor: use rocksdb_wrapper::get to reimplement incr (#651)
---
src/server/pegasus_server_impl.h | 1 +
src/server/pegasus_server_write.h | 1 +
src/server/pegasus_write_service.h | 1 +
src/server/pegasus_write_service_impl.h | 10 ++-
src/server/rocksdb_wrapper.cpp | 70 ++++++++++++++++++
src/server/rocksdb_wrapper.h | 56 ++++++++++++++
src/server/test/CMakeLists.txt | 1 +
.../test/pegasus_write_service_impl_test.cpp | 25 ++++++-
src/server/test/rocksdb_wrapper_test.cpp | 85 ++++++++++++++++++++++
9 files changed, 246 insertions(+), 4 deletions(-)
diff --git a/src/server/pegasus_server_impl.h b/src/server/pegasus_server_impl.h
index 5226212..6f58ee5 100644
--- a/src/server/pegasus_server_impl.h
+++ b/src/server/pegasus_server_impl.h
@@ -189,6 +189,7 @@ private:
friend class pegasus_manual_compact_service;
friend class pegasus_write_service;
+ friend class rocksdb_wrapper;
// parse checkpoint directories in the data dir
// checkpoint directory format is: "checkpoint.{decree}"
diff --git a/src/server/pegasus_server_write.h
b/src/server/pegasus_server_write.h
index a2d0043..33386cf 100644
--- a/src/server/pegasus_server_write.h
+++ b/src/server/pegasus_server_write.h
@@ -74,6 +74,7 @@ private:
friend class pegasus_server_write_test;
friend class pegasus_write_service_test;
friend class pegasus_write_service_impl_test;
+ friend class rocksdb_wrapper_test;
std::unique_ptr<pegasus_write_service> _write_svc;
std::vector<put_rpc> _put_rpc_batch;
diff --git a/src/server/pegasus_write_service.h
b/src/server/pegasus_write_service.h
index cb26c6f..6852a4c 100644
--- a/src/server/pegasus_write_service.h
+++ b/src/server/pegasus_write_service.h
@@ -180,6 +180,7 @@ private:
friend class pegasus_write_service_test;
friend class pegasus_write_service_impl_test;
friend class pegasus_server_write_test;
+ friend class rocksdb_wrapper_test;
pegasus_server_impl *_server;
diff --git a/src/server/pegasus_write_service_impl.h
b/src/server/pegasus_write_service_impl.h
index baa0507..d9bdb75 100644
--- a/src/server/pegasus_write_service_impl.h
+++ b/src/server/pegasus_write_service_impl.h
@@ -25,6 +25,7 @@
#include "base/pegasus_key_schema.h"
#include "meta_store.h"
+#include "rocksdb_wrapper.h"
#include <dsn/utility/fail_point.h>
#include <dsn/utility/filesystem.h>
@@ -38,7 +39,7 @@ namespace server {
static constexpr int FAIL_DB_WRITE_BATCH_PUT = -101;
static constexpr int FAIL_DB_WRITE_BATCH_DELETE = -102;
static constexpr int FAIL_DB_WRITE = -103;
-static constexpr int FAIL_DB_GET = -104;
+extern const int FAIL_DB_GET;
struct db_get_context
{
@@ -97,6 +98,7 @@ public:
{
// disable write ahead logging as replication handles logging instead
now
_wt_opts.disableWAL = true;
+ _rocksdb_wrapper = dsn::make_unique<rocksdb_wrapper>(server);
}
int empty_put(int64_t decree)
@@ -196,7 +198,7 @@ public:
int64_t new_value = 0;
uint32_t new_expire_ts = 0;
db_get_context get_ctx;
- int err = db_get(raw_key, &get_ctx);
+ int err = _rocksdb_wrapper->get(raw_key, &get_ctx);
if (err != 0) {
resp.error = err;
return err;
@@ -207,7 +209,6 @@ public:
new_expire_ts = update.expire_ts_seconds > 0 ?
update.expire_ts_seconds : 0;
} else if (get_ctx.expired) {
// ttl timeout, set to 0 before increment
- _pfc_recent_expire_count->increment();
new_value = update.increment;
new_expire_ts = update.expire_ts_seconds > 0 ?
update.expire_ts_seconds : 0;
} else {
@@ -861,6 +862,7 @@ private:
friend class pegasus_write_service_test;
friend class pegasus_server_write_test;
friend class pegasus_write_service_impl_test;
+ friend class rocksdb_wrapper_test;
FRIEND_TEST(pegasus_write_service_impl_test, put_verify_timetag);
FRIEND_TEST(pegasus_write_service_impl_test,
verify_timetag_compatible_with_version_0);
@@ -877,6 +879,8 @@ private:
::dsn::perf_counter_wrapper &_pfc_recent_expire_count;
pegasus_value_generator _value_generator;
+ std::unique_ptr<rocksdb_wrapper> _rocksdb_wrapper;
+
// for setting update_response.error after committed.
std::vector<dsn::apps::update_response *> _update_responses;
};
diff --git a/src/server/rocksdb_wrapper.cpp b/src/server/rocksdb_wrapper.cpp
new file mode 100644
index 0000000..e7e8bb5
--- /dev/null
+++ b/src/server/rocksdb_wrapper.cpp
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "rocksdb_wrapper.h"
+
+#include <rocksdb/db.h>
+#include "pegasus_write_service_impl.h"
+
+namespace pegasus {
+namespace server {
+
+const int FAIL_DB_GET = -104;
+
+rocksdb_wrapper::rocksdb_wrapper(pegasus_server_impl *server)
+ : replica_base(server),
+ _db(server->_db),
+ _rd_opts(server->_data_cf_rd_opts),
+ _pegasus_data_version(server->_pegasus_data_version),
+ _pfc_recent_expire_count(server->_pfc_recent_expire_count)
+{
+}
+
+int rocksdb_wrapper::get(dsn::string_view raw_key, /*out*/ db_get_context *ctx)
+{
+ FAIL_POINT_INJECT_F("db_get", [](dsn::string_view) -> int { return
FAIL_DB_GET; });
+
+ rocksdb::Status s = _db->Get(_rd_opts, utils::to_rocksdb_slice(raw_key),
&(ctx->raw_value));
+ if (dsn_likely(s.ok())) {
+ // success
+ ctx->found = true;
+ ctx->expire_ts = pegasus_extract_expire_ts(_pegasus_data_version,
ctx->raw_value);
+ if (check_if_ts_expired(utils::epoch_now(), ctx->expire_ts)) {
+ ctx->expired = true;
+ _pfc_recent_expire_count->increment();
+ }
+ return rocksdb::Status::kOk;
+ } else if (s.IsNotFound()) {
+ // NotFound is an acceptable error
+ ctx->found = false;
+ return rocksdb::Status::kOk;
+ }
+
+ dsn::blob hash_key, sort_key;
+ pegasus_restore_key(dsn::blob(raw_key.data(), 0, raw_key.size()),
hash_key, sort_key);
+ derror_rocksdb("Get",
+ s.ToString(),
+ "hash_key: {}, sort_key: {}",
+ utils::c_escape_string(hash_key),
+ utils::c_escape_string(sort_key));
+ return s.code();
+}
+
+} // namespace server
+} // namespace pegasus
diff --git a/src/server/rocksdb_wrapper.h b/src/server/rocksdb_wrapper.h
new file mode 100644
index 0000000..07f49a4
--- /dev/null
+++ b/src/server/rocksdb_wrapper.h
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <dsn/dist/replication/replica_base.h>
+
+namespace rocksdb {
+class DB;
+class ReadOptions;
+} // namespace rocksdb
+
+namespace dsn {
+class perf_counter_wrapper;
+} // namespace dsn
+
+namespace pegasus {
+namespace server {
+struct db_get_context;
+class pegasus_server_impl;
+
+class rocksdb_wrapper : public dsn::replication::replica_base
+{
+public:
+ rocksdb_wrapper(pegasus_server_impl *server);
+
+ /// Calls RocksDB Get and store the result into `db_get_context`.
+ /// \returns 0 if Get succeeded. On failure, a non-zero rocksdb status
code is returned.
+ /// \result ctx.expired=true if record expired. Still 0 is returned.
+ /// \result ctx.found=false if record is not found. Still 0 is returned.
+ int get(dsn::string_view raw_key, /*out*/ db_get_context *ctx);
+
+private:
+ rocksdb::DB *_db;
+ rocksdb::ReadOptions &_rd_opts;
+ const uint32_t _pegasus_data_version;
+ dsn::perf_counter_wrapper &_pfc_recent_expire_count;
+};
+} // namespace server
+} // namespace pegasus
diff --git a/src/server/test/CMakeLists.txt b/src/server/test/CMakeLists.txt
index 0883445..80dc4d6 100644
--- a/src/server/test/CMakeLists.txt
+++ b/src/server/test/CMakeLists.txt
@@ -28,6 +28,7 @@ set(MY_PROJ_SRC "../pegasus_server_impl.cpp"
"../hotspot_partition_calculator.cpp"
"../meta_store.cpp"
"../hotkey_collector.cpp"
+ "../rocksdb_wrapper.cpp"
)
set(MY_SRC_SEARCH_MODE "GLOB")
diff --git a/src/server/test/pegasus_write_service_impl_test.cpp
b/src/server/test/pegasus_write_service_impl_test.cpp
index 48f9fa8..4ba778a 100644
--- a/src/server/test/pegasus_write_service_impl_test.cpp
+++ b/src/server/test/pegasus_write_service_impl_test.cpp
@@ -26,12 +26,14 @@
namespace pegasus {
namespace server {
+extern const int FAIL_DB_GET;
class pegasus_write_service_impl_test : public pegasus_server_test_base
{
protected:
std::unique_ptr<pegasus_server_write> _server_write;
pegasus_write_service::impl *_write_impl{nullptr};
+ rocksdb_wrapper *_rocksdb_wrapper{nullptr};
public:
void SetUp() override
@@ -39,6 +41,7 @@ public:
start();
_server_write = dsn::make_unique<pegasus_server_write>(_server.get(),
true);
_write_impl = _server_write->_write_svc->_impl.get();
+ _rocksdb_wrapper = _write_impl->_rocksdb_wrapper.get();
}
uint64_t read_timestamp_from(dsn::string_view raw_key)
@@ -70,7 +73,7 @@ public:
int db_get(dsn::string_view raw_key, db_get_context *get_ctx)
{
- return _write_impl->db_get(raw_key, get_ctx);
+ return _rocksdb_wrapper->get(raw_key, get_ctx);
}
void single_set(dsn::blob raw_key, dsn::blob user_value)
@@ -258,5 +261,25 @@ TEST_F(incr_test, fail_on_put)
dsn::fail::teardown();
}
+TEST_F(incr_test, incr_on_expire_record)
+{
+ // make the key expired
+ req.expire_ts_seconds = 1;
+ _write_impl->incr(0, req, resp);
+
+ // check whether the key is expired
+ db_get_context get_ctx;
+ db_get(req.key, &get_ctx);
+ ASSERT_TRUE(get_ctx.expired);
+
+ // incr the expired key
+ req.increment = 100;
+ req.expire_ts_seconds = 0;
+ _write_impl->incr(0, req, resp);
+ ASSERT_EQ(resp.new_value, 100);
+
+ db_get(req.key, &get_ctx);
+ ASSERT_TRUE(get_ctx.found);
+}
} // namespace server
} // namespace pegasus
diff --git a/src/server/test/rocksdb_wrapper_test.cpp
b/src/server/test/rocksdb_wrapper_test.cpp
new file mode 100644
index 0000000..6931db5
--- /dev/null
+++ b/src/server/test/rocksdb_wrapper_test.cpp
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "server/pegasus_server_write.h"
+#include "server/pegasus_write_service_impl.h"
+#include "pegasus_server_test_base.h"
+
+namespace pegasus {
+namespace server {
+class rocksdb_wrapper_test : public pegasus_server_test_base
+{
+protected:
+ std::unique_ptr<pegasus_server_write> _server_write;
+ pegasus_write_service::impl *_write_impl{nullptr};
+ rocksdb_wrapper *_rocksdb_wrapper{nullptr};
+ dsn::blob _raw_key;
+
+public:
+ void SetUp() override
+ {
+ start();
+ _server_write = dsn::make_unique<pegasus_server_write>(_server.get(),
true);
+ _write_impl = _server_write->_write_svc->_impl.get();
+ _rocksdb_wrapper = _write_impl->_rocksdb_wrapper.get();
+
+ pegasus::pegasus_generate_key(
+ _raw_key, dsn::string_view("hash_key"),
dsn::string_view("sort_key"));
+ }
+
+ void single_set(dsn::blob raw_key, dsn::blob user_value, int32_t
expire_ts_seconds)
+ {
+ dsn::apps::update_request put;
+ put.key = raw_key;
+ put.value = user_value;
+ put.expire_ts_seconds = expire_ts_seconds;
+ db_write_context write_ctx;
+ dsn::apps::update_response put_resp;
+ _write_impl->batch_put(write_ctx, put, put_resp);
+ ASSERT_EQ(_write_impl->batch_commit(0), 0);
+ }
+};
+
+TEST_F(rocksdb_wrapper_test, get)
+{
+ // not found
+ db_get_context get_ctx1;
+ _rocksdb_wrapper->get(_raw_key, &get_ctx1);
+ ASSERT_FALSE(get_ctx1.found);
+
+ // expired
+ int32_t expired_ts = utils::epoch_now();
+ db_get_context get_ctx2;
+ single_set(_raw_key, dsn::blob::create_from_bytes("abc"), expired_ts);
+ _rocksdb_wrapper->get(_raw_key, &get_ctx2);
+ ASSERT_TRUE(get_ctx2.found);
+ ASSERT_TRUE(get_ctx2.expired);
+ ASSERT_EQ(get_ctx2.expire_ts, expired_ts);
+
+ // found
+ expired_ts = INT32_MAX;
+ db_get_context get_ctx3;
+ single_set(_raw_key, dsn::blob::create_from_bytes("abc"), expired_ts);
+ _rocksdb_wrapper->get(_raw_key, &get_ctx3);
+ ASSERT_TRUE(get_ctx2.found);
+ ASSERT_FALSE(get_ctx3.expired);
+ ASSERT_EQ(get_ctx3.expire_ts, expired_ts);
+}
+} // namespace server
+} // namespace pegasus
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]