This is an automated email from the ASF dual-hosted git repository.
wutao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 95ac849 fix: bugfix on incr (#608)
95ac849 is described below
commit 95ac84914b70312e1241e9917095c1d32584fae1
Author: Wu Tao <[email protected]>
AuthorDate: Fri Sep 25 11:34:03 2020 +0800
fix: bugfix on incr (#608)
---
src/server/pegasus_write_service_impl.h | 98 +++++++++++-----------
.../test/pegasus_write_service_impl_test.cpp | 26 ++++++
2 files changed, 76 insertions(+), 48 deletions(-)
diff --git a/src/server/pegasus_write_service_impl.h
b/src/server/pegasus_write_service_impl.h
index e202ff1..ce01937 100644
--- a/src/server/pegasus_write_service_impl.h
+++ b/src/server/pegasus_write_service_impl.h
@@ -182,59 +182,61 @@ public:
uint32_t new_expire_ts = 0;
db_get_context get_ctx;
int err = db_get(raw_key, &get_ctx);
- if (err == 0) {
- if (!get_ctx.found) {
- // old value is not found, set to 0 before increment
- new_value = update.increment;
- new_expire_ts = update.expire_ts_seconds > 0 ?
update.expire_ts_seconds : 0;
- } else if (get_ctx.expired) {
- // ttl timeout, set to 0 before increment
- _pfc_recent_expire_count->increment();
+ if (err != 0) {
+ resp.error = err;
+ return err;
+ }
+ if (!get_ctx.found) {
+ // old value is not found, set to 0 before increment
+ new_value = update.increment;
+ new_expire_ts = update.expire_ts_seconds > 0 ?
update.expire_ts_seconds : 0;
+ } else if (get_ctx.expired) {
+ // ttl timeout, set to 0 before increment
+ _pfc_recent_expire_count->increment();
+ new_value = update.increment;
+ new_expire_ts = update.expire_ts_seconds > 0 ?
update.expire_ts_seconds : 0;
+ } else {
+ ::dsn::blob old_value;
+ pegasus_extract_user_data(
+ _pegasus_data_version, std::move(get_ctx.raw_value),
old_value);
+ if (old_value.length() == 0) {
+ // empty old value, set to 0 before increment
new_value = update.increment;
- new_expire_ts = update.expire_ts_seconds > 0 ?
update.expire_ts_seconds : 0;
} else {
- ::dsn::blob old_value;
- pegasus_extract_user_data(
- _pegasus_data_version, std::move(get_ctx.raw_value),
old_value);
- if (old_value.length() == 0) {
- // empty old value, set to 0 before increment
- new_value = update.increment;
- } else {
- int64_t old_value_int;
- if (!dsn::buf2int64(old_value, old_value_int)) {
- // invalid old value
- derror_replica("incr failed: decree = {}, error = "
- "old value \"{}\" is not an integer or
out of range",
- decree,
- utils::c_escape_string(old_value));
- resp.error = rocksdb::Status::kInvalidArgument;
- // we should write empty record to update rocksdb's
last flushed decree
- return empty_put(decree);
- }
- new_value = old_value_int + update.increment;
- if ((update.increment > 0 && new_value < old_value_int) ||
- (update.increment < 0 && new_value > old_value_int)) {
- // new value is out of range, return old value by
'new_value'
- derror_replica("incr failed: decree = {}, error = "
- "new value is out of range, old_value =
{}, increment = {}",
- decree,
- old_value_int,
- update.increment);
- resp.error = rocksdb::Status::kInvalidArgument;
- resp.new_value = old_value_int;
- // we should write empty record to update rocksdb's
last flushed decree
- return empty_put(decree);
- }
+ int64_t old_value_int;
+ if (!dsn::buf2int64(old_value, old_value_int)) {
+ // invalid old value
+ derror_replica("incr failed: decree = {}, error = "
+ "old value \"{}\" is not an integer or out
of range",
+ decree,
+ utils::c_escape_string(old_value));
+ resp.error = rocksdb::Status::kInvalidArgument;
+ // we should write empty record to update rocksdb's last
flushed decree
+ return empty_put(decree);
}
- // set new ttl
- if (update.expire_ts_seconds == 0) {
- new_expire_ts = get_ctx.expire_ts;
- } else if (update.expire_ts_seconds < 0) {
- new_expire_ts = 0;
- } else { // update.expire_ts_seconds > 0
- new_expire_ts = update.expire_ts_seconds;
+ new_value = old_value_int + update.increment;
+ if ((update.increment > 0 && new_value < old_value_int) ||
+ (update.increment < 0 && new_value > old_value_int)) {
+ // new value is out of range, return old value by
'new_value'
+ derror_replica("incr failed: decree = {}, error = "
+ "new value is out of range, old_value = {},
increment = {}",
+ decree,
+ old_value_int,
+ update.increment);
+ resp.error = rocksdb::Status::kInvalidArgument;
+ resp.new_value = old_value_int;
+ // we should write empty record to update rocksdb's last
flushed decree
+ return empty_put(decree);
}
}
+ // set new ttl
+ if (update.expire_ts_seconds == 0) {
+ new_expire_ts = get_ctx.expire_ts;
+ } else if (update.expire_ts_seconds < 0) {
+ new_expire_ts = 0;
+ } else { // update.expire_ts_seconds > 0
+ new_expire_ts = update.expire_ts_seconds;
+ }
}
resp.error =
diff --git a/src/server/test/pegasus_write_service_impl_test.cpp
b/src/server/test/pegasus_write_service_impl_test.cpp
index 7f1aec0..f3ee0a8 100644
--- a/src/server/test/pegasus_write_service_impl_test.cpp
+++ b/src/server/test/pegasus_write_service_impl_test.cpp
@@ -217,5 +217,31 @@ TEST_F(incr_test, invalid_incr)
ASSERT_EQ(resp.new_value, 100);
}
+TEST_F(incr_test, fail_on_get)
+{
+ dsn::fail::setup();
+ dsn::fail::cfg("db_get", "100%1*return()");
+ // when db_get failed, incr should return an error.
+
+ req.increment = 10;
+ _write_impl->incr(1, req, resp);
+ ASSERT_EQ(resp.error, FAIL_DB_GET);
+
+ dsn::fail::teardown();
+}
+
+TEST_F(incr_test, fail_on_put)
+{
+ dsn::fail::setup();
+ dsn::fail::cfg("db_write_batch_put", "100%1*return()");
+ // when rocksdb put failed, incr should return an error.
+
+ req.increment = 10;
+ _write_impl->incr(1, req, resp);
+ ASSERT_EQ(resp.error, FAIL_DB_WRITE_BATCH_PUT);
+
+ dsn::fail::teardown();
+}
+
} // namespace server
} // namespace pegasus
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]