This is an automated email from the ASF dual-hosted git repository.

wutao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 95ac849  fix: bugfix on incr (#608)
95ac849 is described below

commit 95ac84914b70312e1241e9917095c1d32584fae1
Author: Wu Tao <[email protected]>
AuthorDate: Fri Sep 25 11:34:03 2020 +0800

    fix: bugfix on incr (#608)
---
 src/server/pegasus_write_service_impl.h            | 98 +++++++++++-----------
 .../test/pegasus_write_service_impl_test.cpp       | 26 ++++++
 2 files changed, 76 insertions(+), 48 deletions(-)

diff --git a/src/server/pegasus_write_service_impl.h 
b/src/server/pegasus_write_service_impl.h
index e202ff1..ce01937 100644
--- a/src/server/pegasus_write_service_impl.h
+++ b/src/server/pegasus_write_service_impl.h
@@ -182,59 +182,61 @@ public:
         uint32_t new_expire_ts = 0;
         db_get_context get_ctx;
         int err = db_get(raw_key, &get_ctx);
-        if (err == 0) {
-            if (!get_ctx.found) {
-                // old value is not found, set to 0 before increment
-                new_value = update.increment;
-                new_expire_ts = update.expire_ts_seconds > 0 ? 
update.expire_ts_seconds : 0;
-            } else if (get_ctx.expired) {
-                // ttl timeout, set to 0 before increment
-                _pfc_recent_expire_count->increment();
+        if (err != 0) {
+            resp.error = err;
+            return err;
+        }
+        if (!get_ctx.found) {
+            // old value is not found, set to 0 before increment
+            new_value = update.increment;
+            new_expire_ts = update.expire_ts_seconds > 0 ? 
update.expire_ts_seconds : 0;
+        } else if (get_ctx.expired) {
+            // ttl timeout, set to 0 before increment
+            _pfc_recent_expire_count->increment();
+            new_value = update.increment;
+            new_expire_ts = update.expire_ts_seconds > 0 ? 
update.expire_ts_seconds : 0;
+        } else {
+            ::dsn::blob old_value;
+            pegasus_extract_user_data(
+                _pegasus_data_version, std::move(get_ctx.raw_value), 
old_value);
+            if (old_value.length() == 0) {
+                // empty old value, set to 0 before increment
                 new_value = update.increment;
-                new_expire_ts = update.expire_ts_seconds > 0 ? 
update.expire_ts_seconds : 0;
             } else {
-                ::dsn::blob old_value;
-                pegasus_extract_user_data(
-                    _pegasus_data_version, std::move(get_ctx.raw_value), 
old_value);
-                if (old_value.length() == 0) {
-                    // empty old value, set to 0 before increment
-                    new_value = update.increment;
-                } else {
-                    int64_t old_value_int;
-                    if (!dsn::buf2int64(old_value, old_value_int)) {
-                        // invalid old value
-                        derror_replica("incr failed: decree = {}, error = "
-                                       "old value \"{}\" is not an integer or 
out of range",
-                                       decree,
-                                       utils::c_escape_string(old_value));
-                        resp.error = rocksdb::Status::kInvalidArgument;
-                        // we should write empty record to update rocksdb's 
last flushed decree
-                        return empty_put(decree);
-                    }
-                    new_value = old_value_int + update.increment;
-                    if ((update.increment > 0 && new_value < old_value_int) ||
-                        (update.increment < 0 && new_value > old_value_int)) {
-                        // new value is out of range, return old value by 
'new_value'
-                        derror_replica("incr failed: decree = {}, error = "
-                                       "new value is out of range, old_value = 
{}, increment = {}",
-                                       decree,
-                                       old_value_int,
-                                       update.increment);
-                        resp.error = rocksdb::Status::kInvalidArgument;
-                        resp.new_value = old_value_int;
-                        // we should write empty record to update rocksdb's 
last flushed decree
-                        return empty_put(decree);
-                    }
+                int64_t old_value_int;
+                if (!dsn::buf2int64(old_value, old_value_int)) {
+                    // invalid old value
+                    derror_replica("incr failed: decree = {}, error = "
+                                   "old value \"{}\" is not an integer or out 
of range",
+                                   decree,
+                                   utils::c_escape_string(old_value));
+                    resp.error = rocksdb::Status::kInvalidArgument;
+                    // we should write empty record to update rocksdb's last 
flushed decree
+                    return empty_put(decree);
                 }
-                // set new ttl
-                if (update.expire_ts_seconds == 0) {
-                    new_expire_ts = get_ctx.expire_ts;
-                } else if (update.expire_ts_seconds < 0) {
-                    new_expire_ts = 0;
-                } else { // update.expire_ts_seconds > 0
-                    new_expire_ts = update.expire_ts_seconds;
+                new_value = old_value_int + update.increment;
+                if ((update.increment > 0 && new_value < old_value_int) ||
+                    (update.increment < 0 && new_value > old_value_int)) {
+                    // new value is out of range, return old value by 
'new_value'
+                    derror_replica("incr failed: decree = {}, error = "
+                                   "new value is out of range, old_value = {}, 
increment = {}",
+                                   decree,
+                                   old_value_int,
+                                   update.increment);
+                    resp.error = rocksdb::Status::kInvalidArgument;
+                    resp.new_value = old_value_int;
+                    // we should write empty record to update rocksdb's last 
flushed decree
+                    return empty_put(decree);
                 }
             }
+            // set new ttl
+            if (update.expire_ts_seconds == 0) {
+                new_expire_ts = get_ctx.expire_ts;
+            } else if (update.expire_ts_seconds < 0) {
+                new_expire_ts = 0;
+            } else { // update.expire_ts_seconds > 0
+                new_expire_ts = update.expire_ts_seconds;
+            }
         }
 
         resp.error =
diff --git a/src/server/test/pegasus_write_service_impl_test.cpp 
b/src/server/test/pegasus_write_service_impl_test.cpp
index 7f1aec0..f3ee0a8 100644
--- a/src/server/test/pegasus_write_service_impl_test.cpp
+++ b/src/server/test/pegasus_write_service_impl_test.cpp
@@ -217,5 +217,31 @@ TEST_F(incr_test, invalid_incr)
     ASSERT_EQ(resp.new_value, 100);
 }
 
+TEST_F(incr_test, fail_on_get)
+{
+    dsn::fail::setup();
+    dsn::fail::cfg("db_get", "100%1*return()");
+    // when db_get failed, incr should return an error.
+
+    req.increment = 10;
+    _write_impl->incr(1, req, resp);
+    ASSERT_EQ(resp.error, FAIL_DB_GET);
+
+    dsn::fail::teardown();
+}
+
+TEST_F(incr_test, fail_on_put)
+{
+    dsn::fail::setup();
+    dsn::fail::cfg("db_write_batch_put", "100%1*return()");
+    // when rocksdb put failed, incr should return an error.
+
+    req.increment = 10;
+    _write_impl->incr(1, req, resp);
+    ASSERT_EQ(resp.error, FAIL_DB_WRITE_BATCH_PUT);
+
+    dsn::fail::teardown();
+}
+
 } // namespace server
 } // namespace pegasus


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to