This is an automated email from the ASF dual-hosted git repository.

wutao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new c212da6  refactor:  use rocksdb_wrapper::get to reimplement incr (#651)
c212da6 is described below

commit c212da634c177c7cfd3ee64bea4c2f1658c05342
Author: zhao liwei <[email protected]>
AuthorDate: Thu Dec 10 08:45:10 2020 +0800

    refactor:  use rocksdb_wrapper::get to reimplement incr (#651)
---
 src/server/pegasus_server_impl.h                   |  1 +
 src/server/pegasus_server_write.h                  |  1 +
 src/server/pegasus_write_service.h                 |  1 +
 src/server/pegasus_write_service_impl.h            | 10 ++-
 src/server/rocksdb_wrapper.cpp                     | 70 ++++++++++++++++++
 src/server/rocksdb_wrapper.h                       | 56 ++++++++++++++
 src/server/test/CMakeLists.txt                     |  1 +
 .../test/pegasus_write_service_impl_test.cpp       | 25 ++++++-
 src/server/test/rocksdb_wrapper_test.cpp           | 85 ++++++++++++++++++++++
 9 files changed, 246 insertions(+), 4 deletions(-)

diff --git a/src/server/pegasus_server_impl.h b/src/server/pegasus_server_impl.h
index 5226212..6f58ee5 100644
--- a/src/server/pegasus_server_impl.h
+++ b/src/server/pegasus_server_impl.h
@@ -189,6 +189,7 @@ private:
 
     friend class pegasus_manual_compact_service;
     friend class pegasus_write_service;
+    friend class rocksdb_wrapper;
 
     // parse checkpoint directories in the data dir
     // checkpoint directory format is: "checkpoint.{decree}"
diff --git a/src/server/pegasus_server_write.h 
b/src/server/pegasus_server_write.h
index a2d0043..33386cf 100644
--- a/src/server/pegasus_server_write.h
+++ b/src/server/pegasus_server_write.h
@@ -74,6 +74,7 @@ private:
     friend class pegasus_server_write_test;
     friend class pegasus_write_service_test;
     friend class pegasus_write_service_impl_test;
+    friend class rocksdb_wrapper_test;
 
     std::unique_ptr<pegasus_write_service> _write_svc;
     std::vector<put_rpc> _put_rpc_batch;
diff --git a/src/server/pegasus_write_service.h 
b/src/server/pegasus_write_service.h
index cb26c6f..6852a4c 100644
--- a/src/server/pegasus_write_service.h
+++ b/src/server/pegasus_write_service.h
@@ -180,6 +180,7 @@ private:
     friend class pegasus_write_service_test;
     friend class pegasus_write_service_impl_test;
     friend class pegasus_server_write_test;
+    friend class rocksdb_wrapper_test;
 
     pegasus_server_impl *_server;
 
diff --git a/src/server/pegasus_write_service_impl.h 
b/src/server/pegasus_write_service_impl.h
index baa0507..d9bdb75 100644
--- a/src/server/pegasus_write_service_impl.h
+++ b/src/server/pegasus_write_service_impl.h
@@ -25,6 +25,7 @@
 
 #include "base/pegasus_key_schema.h"
 #include "meta_store.h"
+#include "rocksdb_wrapper.h"
 
 #include <dsn/utility/fail_point.h>
 #include <dsn/utility/filesystem.h>
@@ -38,7 +39,7 @@ namespace server {
 static constexpr int FAIL_DB_WRITE_BATCH_PUT = -101;
 static constexpr int FAIL_DB_WRITE_BATCH_DELETE = -102;
 static constexpr int FAIL_DB_WRITE = -103;
-static constexpr int FAIL_DB_GET = -104;
+extern const int FAIL_DB_GET;
 
 struct db_get_context
 {
@@ -97,6 +98,7 @@ public:
     {
         // disable write ahead logging as replication handles logging instead 
now
         _wt_opts.disableWAL = true;
+        _rocksdb_wrapper = dsn::make_unique<rocksdb_wrapper>(server);
     }
 
     int empty_put(int64_t decree)
@@ -196,7 +198,7 @@ public:
         int64_t new_value = 0;
         uint32_t new_expire_ts = 0;
         db_get_context get_ctx;
-        int err = db_get(raw_key, &get_ctx);
+        int err = _rocksdb_wrapper->get(raw_key, &get_ctx);
         if (err != 0) {
             resp.error = err;
             return err;
@@ -207,7 +209,6 @@ public:
             new_expire_ts = update.expire_ts_seconds > 0 ? 
update.expire_ts_seconds : 0;
         } else if (get_ctx.expired) {
             // ttl timeout, set to 0 before increment
-            _pfc_recent_expire_count->increment();
             new_value = update.increment;
             new_expire_ts = update.expire_ts_seconds > 0 ? 
update.expire_ts_seconds : 0;
         } else {
@@ -861,6 +862,7 @@ private:
     friend class pegasus_write_service_test;
     friend class pegasus_server_write_test;
     friend class pegasus_write_service_impl_test;
+    friend class rocksdb_wrapper_test;
     FRIEND_TEST(pegasus_write_service_impl_test, put_verify_timetag);
     FRIEND_TEST(pegasus_write_service_impl_test, 
verify_timetag_compatible_with_version_0);
 
@@ -877,6 +879,8 @@ private:
     ::dsn::perf_counter_wrapper &_pfc_recent_expire_count;
     pegasus_value_generator _value_generator;
 
+    std::unique_ptr<rocksdb_wrapper> _rocksdb_wrapper;
+
     // for setting update_response.error after committed.
     std::vector<dsn::apps::update_response *> _update_responses;
 };
diff --git a/src/server/rocksdb_wrapper.cpp b/src/server/rocksdb_wrapper.cpp
new file mode 100644
index 0000000..e7e8bb5
--- /dev/null
+++ b/src/server/rocksdb_wrapper.cpp
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "rocksdb_wrapper.h"
+
+#include <rocksdb/db.h>
+#include "pegasus_write_service_impl.h"
+
+namespace pegasus {
+namespace server {
+
+const int FAIL_DB_GET = -104;
+
+rocksdb_wrapper::rocksdb_wrapper(pegasus_server_impl *server)
+    : replica_base(server),
+      _db(server->_db),
+      _rd_opts(server->_data_cf_rd_opts),
+      _pegasus_data_version(server->_pegasus_data_version),
+      _pfc_recent_expire_count(server->_pfc_recent_expire_count)
+{
+}
+
+int rocksdb_wrapper::get(dsn::string_view raw_key, /*out*/ db_get_context *ctx)
+{
+    FAIL_POINT_INJECT_F("db_get", [](dsn::string_view) -> int { return 
FAIL_DB_GET; });
+
+    rocksdb::Status s = _db->Get(_rd_opts, utils::to_rocksdb_slice(raw_key), 
&(ctx->raw_value));
+    if (dsn_likely(s.ok())) {
+        // success
+        ctx->found = true;
+        ctx->expire_ts = pegasus_extract_expire_ts(_pegasus_data_version, 
ctx->raw_value);
+        if (check_if_ts_expired(utils::epoch_now(), ctx->expire_ts)) {
+            ctx->expired = true;
+            _pfc_recent_expire_count->increment();
+        }
+        return rocksdb::Status::kOk;
+    } else if (s.IsNotFound()) {
+        // NotFound is an acceptable error
+        ctx->found = false;
+        return rocksdb::Status::kOk;
+    }
+
+    dsn::blob hash_key, sort_key;
+    pegasus_restore_key(dsn::blob(raw_key.data(), 0, raw_key.size()), 
hash_key, sort_key);
+    derror_rocksdb("Get",
+                   s.ToString(),
+                   "hash_key: {}, sort_key: {}",
+                   utils::c_escape_string(hash_key),
+                   utils::c_escape_string(sort_key));
+    return s.code();
+}
+
+} // namespace server
+} // namespace pegasus
diff --git a/src/server/rocksdb_wrapper.h b/src/server/rocksdb_wrapper.h
new file mode 100644
index 0000000..07f49a4
--- /dev/null
+++ b/src/server/rocksdb_wrapper.h
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <dsn/dist/replication/replica_base.h>
+
+namespace rocksdb {
+class DB;
+class ReadOptions;
+} // namespace rocksdb
+
+namespace dsn {
+class perf_counter_wrapper;
+} // namespace dsn
+
+namespace pegasus {
+namespace server {
+struct db_get_context;
+class pegasus_server_impl;
+
+class rocksdb_wrapper : public dsn::replication::replica_base
+{
+public:
+    rocksdb_wrapper(pegasus_server_impl *server);
+
+    /// Calls RocksDB Get and store the result into `db_get_context`.
+    /// \returns 0 if Get succeeded. On failure, a non-zero rocksdb status 
code is returned.
+    /// \result ctx.expired=true if record expired. Still 0 is returned.
+    /// \result ctx.found=false if record is not found. Still 0 is returned.
+    int get(dsn::string_view raw_key, /*out*/ db_get_context *ctx);
+
+private:
+    rocksdb::DB *_db;
+    rocksdb::ReadOptions &_rd_opts;
+    const uint32_t _pegasus_data_version;
+    dsn::perf_counter_wrapper &_pfc_recent_expire_count;
+};
+} // namespace server
+} // namespace pegasus
diff --git a/src/server/test/CMakeLists.txt b/src/server/test/CMakeLists.txt
index 0883445..80dc4d6 100644
--- a/src/server/test/CMakeLists.txt
+++ b/src/server/test/CMakeLists.txt
@@ -28,6 +28,7 @@ set(MY_PROJ_SRC "../pegasus_server_impl.cpp"
                 "../hotspot_partition_calculator.cpp"
                 "../meta_store.cpp"
                 "../hotkey_collector.cpp"
+                "../rocksdb_wrapper.cpp"
 )
 
 set(MY_SRC_SEARCH_MODE "GLOB")
diff --git a/src/server/test/pegasus_write_service_impl_test.cpp 
b/src/server/test/pegasus_write_service_impl_test.cpp
index 48f9fa8..4ba778a 100644
--- a/src/server/test/pegasus_write_service_impl_test.cpp
+++ b/src/server/test/pegasus_write_service_impl_test.cpp
@@ -26,12 +26,14 @@
 
 namespace pegasus {
 namespace server {
+extern const int FAIL_DB_GET;
 
 class pegasus_write_service_impl_test : public pegasus_server_test_base
 {
 protected:
     std::unique_ptr<pegasus_server_write> _server_write;
     pegasus_write_service::impl *_write_impl{nullptr};
+    rocksdb_wrapper *_rocksdb_wrapper{nullptr};
 
 public:
     void SetUp() override
@@ -39,6 +41,7 @@ public:
         start();
         _server_write = dsn::make_unique<pegasus_server_write>(_server.get(), 
true);
         _write_impl = _server_write->_write_svc->_impl.get();
+        _rocksdb_wrapper = _write_impl->_rocksdb_wrapper.get();
     }
 
     uint64_t read_timestamp_from(dsn::string_view raw_key)
@@ -70,7 +73,7 @@ public:
 
     int db_get(dsn::string_view raw_key, db_get_context *get_ctx)
     {
-        return _write_impl->db_get(raw_key, get_ctx);
+        return _rocksdb_wrapper->get(raw_key, get_ctx);
     }
 
     void single_set(dsn::blob raw_key, dsn::blob user_value)
@@ -258,5 +261,25 @@ TEST_F(incr_test, fail_on_put)
     dsn::fail::teardown();
 }
 
+TEST_F(incr_test, incr_on_expire_record)
+{
+    // make the key expired
+    req.expire_ts_seconds = 1;
+    _write_impl->incr(0, req, resp);
+
+    // check whether the key is expired
+    db_get_context get_ctx;
+    db_get(req.key, &get_ctx);
+    ASSERT_TRUE(get_ctx.expired);
+
+    // incr the expired key
+    req.increment = 100;
+    req.expire_ts_seconds = 0;
+    _write_impl->incr(0, req, resp);
+    ASSERT_EQ(resp.new_value, 100);
+
+    db_get(req.key, &get_ctx);
+    ASSERT_TRUE(get_ctx.found);
+}
 } // namespace server
 } // namespace pegasus
diff --git a/src/server/test/rocksdb_wrapper_test.cpp 
b/src/server/test/rocksdb_wrapper_test.cpp
new file mode 100644
index 0000000..6931db5
--- /dev/null
+++ b/src/server/test/rocksdb_wrapper_test.cpp
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "server/pegasus_server_write.h"
+#include "server/pegasus_write_service_impl.h"
+#include "pegasus_server_test_base.h"
+
+namespace pegasus {
+namespace server {
+class rocksdb_wrapper_test : public pegasus_server_test_base
+{
+protected:
+    std::unique_ptr<pegasus_server_write> _server_write;
+    pegasus_write_service::impl *_write_impl{nullptr};
+    rocksdb_wrapper *_rocksdb_wrapper{nullptr};
+    dsn::blob _raw_key;
+
+public:
+    void SetUp() override
+    {
+        start();
+        _server_write = dsn::make_unique<pegasus_server_write>(_server.get(), 
true);
+        _write_impl = _server_write->_write_svc->_impl.get();
+        _rocksdb_wrapper = _write_impl->_rocksdb_wrapper.get();
+
+        pegasus::pegasus_generate_key(
+            _raw_key, dsn::string_view("hash_key"), 
dsn::string_view("sort_key"));
+    }
+
+    void single_set(dsn::blob raw_key, dsn::blob user_value, int32_t 
expire_ts_seconds)
+    {
+        dsn::apps::update_request put;
+        put.key = raw_key;
+        put.value = user_value;
+        put.expire_ts_seconds = expire_ts_seconds;
+        db_write_context write_ctx;
+        dsn::apps::update_response put_resp;
+        _write_impl->batch_put(write_ctx, put, put_resp);
+        ASSERT_EQ(_write_impl->batch_commit(0), 0);
+    }
+};
+
+TEST_F(rocksdb_wrapper_test, get)
+{
+    // not found
+    db_get_context get_ctx1;
+    _rocksdb_wrapper->get(_raw_key, &get_ctx1);
+    ASSERT_FALSE(get_ctx1.found);
+
+    // expired
+    int32_t expired_ts = utils::epoch_now();
+    db_get_context get_ctx2;
+    single_set(_raw_key, dsn::blob::create_from_bytes("abc"), expired_ts);
+    _rocksdb_wrapper->get(_raw_key, &get_ctx2);
+    ASSERT_TRUE(get_ctx2.found);
+    ASSERT_TRUE(get_ctx2.expired);
+    ASSERT_EQ(get_ctx2.expire_ts, expired_ts);
+
+    // found
+    expired_ts = INT32_MAX;
+    db_get_context get_ctx3;
+    single_set(_raw_key, dsn::blob::create_from_bytes("abc"), expired_ts);
+    _rocksdb_wrapper->get(_raw_key, &get_ctx3);
+    ASSERT_TRUE(get_ctx2.found);
+    ASSERT_FALSE(get_ctx3.expired);
+    ASSERT_EQ(get_ctx3.expire_ts, expired_ts);
+}
+} // namespace server
+} // namespace pegasus


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to