This is an automated email from the ASF dual-hosted git repository.
smityz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new c2e6ea2 feat: range read count enhancement (#811)
c2e6ea2 is described below
commit c2e6ea2ea2aaab6968753980e468f1dcdd8d37d5
Author: HeYuchen <[email protected]>
AuthorDate: Tue Sep 7 10:39:15 2021 +0800
feat: range read count enhancement (#811)
---
src/server/pegasus_server_impl.cpp | 36 +++--
src/test/function_test/run.sh | 2 +
src/test/function_test/test_basic.cpp | 2 +-
src/test/function_test/test_range_read.cpp | 203 +++++++++++++++++++++++++++++
4 files changed, 228 insertions(+), 15 deletions(-)
diff --git a/src/server/pegasus_server_impl.cpp
b/src/server/pegasus_server_impl.cpp
index 0779b6c..4e8cf69 100644
--- a/src/server/pegasus_server_impl.cpp
+++ b/src/server/pegasus_server_impl.cpp
@@ -365,9 +365,11 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc)
return;
}
- uint32_t max_kv_count = request.max_kv_count > 0 ? request.max_kv_count :
INT_MAX;
- uint32_t max_iteration_count =
- std::min(max_kv_count, _rng_rd_opts.multi_get_max_iteration_count);
+ uint32_t max_kv_count = _rng_rd_opts.multi_get_max_iteration_count;
+ uint32_t max_iteration_count = _rng_rd_opts.multi_get_max_iteration_count;
+ if (request.max_kv_count > 0 && request.max_kv_count < max_kv_count) {
+ max_kv_count = request.max_kv_count;
+ }
int32_t max_kv_size = request.max_kv_size > 0 ? request.max_kv_size :
INT_MAX;
int32_t max_iteration_size_config =
_rng_rd_opts.multi_get_max_iteration_size > 0
@@ -463,7 +465,7 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc)
it.reset(_db->NewIterator(_data_cf_rd_opts, _data_cf));
it->Seek(start);
bool first_exclusive = !start_inclusive;
- while (limiter->valid() && it->Valid()) {
+ while (count < max_kv_count && limiter->valid() && it->Valid()) {
// check stop sort key
int c = it->key().compare(stop);
if (c > 0 || (c == 0 && !stop_inclusive)) {
@@ -535,7 +537,7 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc)
it->SeekForPrev(stop);
bool first_exclusive = !stop_inclusive;
std::vector<::dsn::apps::key_value> reverse_kvs;
- while (limiter->valid() && it->Valid()) {
+ while (count < max_kv_count && limiter->valid() && it->Valid()) {
// check start sort key
int c = it->key().compare(start);
if (c < 0 || (c == 0 && !start_inclusive)) {
@@ -1006,16 +1008,20 @@ void
pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)
uint64_t filter_count = 0;
int32_t count = 0;
- uint32_t request_batch_size = request.batch_size > 0 ? request.batch_size
: INT_MAX;
- uint32_t batch_count = std::min(request_batch_size,
_rng_rd_opts.rocksdb_max_iteration_count);
+ uint32_t batch_count = _rng_rd_opts.rocksdb_max_iteration_count;
+ if (request.batch_size > 0 && request.batch_size < batch_count) {
+ batch_count = request.batch_size;
+ }
resp.kvs.reserve(batch_count);
bool return_expire_ts = request.__isset.return_expire_ts ?
request.return_expire_ts : false;
- std::unique_ptr<range_read_limiter> limiter =
dsn::make_unique<range_read_limiter>(
- batch_count, 0, _rng_rd_opts.rocksdb_iteration_threshold_time_ms);
+ std::unique_ptr<range_read_limiter> limiter =
+
dsn::make_unique<range_read_limiter>(_rng_rd_opts.rocksdb_max_iteration_count,
+ 0,
+
_rng_rd_opts.rocksdb_iteration_threshold_time_ms);
- while (limiter->valid() && it->Valid()) {
+ while (count < batch_count && limiter->valid() && it->Valid()) {
int c = it->key().compare(stop);
if (c > 0 || (c == 0 && !stop_inclusive)) {
// out of range
@@ -1178,14 +1184,15 @@ void pegasus_server_impl::on_scan(scan_rpc rpc)
uint64_t filter_count = 0;
int32_t count = 0;
- uint32_t context_batch_size = context->batch_size > 0 ?
context->batch_size : INT_MAX;
- uint32_t batch_count =
- std::min(context_batch_size,
_rng_rd_opts.rocksdb_max_iteration_count);
+ uint32_t batch_count = _rng_rd_opts.rocksdb_max_iteration_count;
+ if (context->batch_size > 0 && context->batch_size < batch_count) {
+ batch_count = context->batch_size;
+ }
std::unique_ptr<range_read_limiter> limiter =
dsn::make_unique<range_read_limiter>(
batch_count, 0, _rng_rd_opts.rocksdb_iteration_threshold_time_ms);
- while (limiter->valid() && it->Valid()) {
+ while (count < batch_count && limiter->valid() && it->Valid()) {
int c = it->key().compare(stop);
if (c > 0 || (c == 0 && !stop_inclusive)) {
// out of range
@@ -2186,6 +2193,7 @@ range_iteration_state
pegasus_server_impl::append_key_value_for_multi_get(
::dsn::blob raw_key(key.data(), 0, key.size());
::dsn::blob hash_key, sort_key;
pegasus_restore_key(raw_key, hash_key, sort_key);
+
if (sort_key_filter_type != ::dsn::apps::filter_type::FT_NO_FILTER &&
!validate_filter(sort_key_filter_type, sort_key_filter_pattern,
sort_key)) {
if (_verbose_log) {
diff --git a/src/test/function_test/run.sh b/src/test/function_test/run.sh
index 260944e..f7abaa3 100755
--- a/src/test/function_test/run.sh
+++ b/src/test/function_test/run.sh
@@ -58,6 +58,8 @@ GTEST_OUTPUT="xml:$REPORT_DIR/check_and_mutate.xml"
GTEST_FILTER="check_and_muta
exit_if_fail $? "run test check_and_mutate failed: $test_case $config_file
$table_name"
GTEST_OUTPUT="xml:$REPORT_DIR/scan.xml" GTEST_FILTER="scan.*" ./$test_case
$config_file $table_name
exit_if_fail $? "run test scan failed: $test_case $config_file $table_name"
+GTEST_OUTPUT="xml:$REPORT_DIR/range_read.xml" GTEST_FILTER="range_read_test.*"
./$test_case $config_file $table_name
+exit_if_fail $? "run test range_read failed: $test_case $config_file
$table_name"
GTEST_OUTPUT="xml:$REPORT_DIR/ttl.xml" GTEST_FILTER="ttl.*" ./$test_case
$config_file $table_name
exit_if_fail $? "run test ttl failed: $test_case $config_file $table_name"
GTEST_OUTPUT="xml:$REPORT_DIR/slog_log.xml" GTEST_FILTER="lost_log.*"
./$test_case $config_file $table_name
diff --git a/src/test/function_test/test_basic.cpp
b/src/test/function_test/test_basic.cpp
index 592a6ce..d89edc8 100644
--- a/src/test/function_test/test_basic.cpp
+++ b/src/test/function_test/test_basic.cpp
@@ -568,7 +568,7 @@ TEST(basic, multi_get)
new_values.clear();
ret = client->multi_get("basic_test_multi_get", "", "", options,
new_values, 2);
ASSERT_EQ(PERR_INCOMPLETE, ret);
- ASSERT_EQ(1, (int)new_values.size());
+ ASSERT_EQ(2, (int)new_values.size());
ASSERT_EQ("1", new_values["1"]);
// multi_del
diff --git a/src/test/function_test/test_range_read.cpp
b/src/test/function_test/test_range_read.cpp
new file mode 100644
index 0000000..4fca842
--- /dev/null
+++ b/src/test/function_test/test_range_read.cpp
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <dsn/service_api_c.h>
+#include <gtest/gtest.h>
+#include <pegasus/client.h>
+#include <pegasus/error.h>
+
+using namespace ::dsn;
+using namespace pegasus;
+
+class range_read_test : public testing::Test
+{
+public:
+ void prepare(const int32_t total_count, const int32_t expire_count)
+ {
+ if (expire_count > 0) {
+ // set expire values
+ for (auto i = 0; i < expire_count; i++) {
+ std::string sort_key = "1-" + std::to_string(i);
+ sortkeys.insert(sort_key);
+ kvs[sort_key] = value;
+ }
+ auto ret = pg_client->multi_set(hashkey, kvs, timeoutms, 1);
+ ASSERT_EQ(PERR_OK, ret);
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ kvs.clear();
+ }
+
+ if (total_count > expire_count) {
+ // set normal values
+ for (auto i = expire_count; i < total_count; i++) {
+ std::string sort_key = "2-" + std::to_string(i);
+ sortkeys.insert(sort_key);
+ kvs[sort_key] = value;
+ }
+ auto ret = pg_client->multi_set(hashkey, kvs);
+ ASSERT_EQ(PERR_OK, ret);
+ }
+ }
+
+ void cleanup(const int32_t expected_deleted_count)
+ {
+ int64_t deleted_count;
+ auto ret = pg_client->multi_del(hashkey, sortkeys, deleted_count);
+ ASSERT_EQ(PERR_OK, ret);
+ ASSERT_EQ(deleted_count, expected_deleted_count);
+ sortkeys.clear();
+ kvs.clear();
+ }
+
+ void test_scan(const int32_t expire_count,
+ const int32_t total_count,
+ const int32_t batch_count,
+ const int32_t expected_scan_count)
+ {
+ pegasus::pegasus_client::scan_options options;
+ options.batch_size = batch_count;
+ pegasus::pegasus_client::pegasus_scanner *scanner;
+ auto ret = pg_client->get_scanner(hashkey, "", "", options, scanner);
+ ASSERT_EQ(ret, PERR_OK);
+
+ std::map<std::string, std::string> scan_kvs;
+ std::string hash_key;
+ std::string sort_key;
+ std::string act_value;
+ auto i = expire_count;
+ while (i < total_count) {
+ ret = scanner->next(hash_key, sort_key, act_value);
+ ASSERT_EQ(ret, PERR_OK);
+ ASSERT_EQ(hash_key, hashkey);
+ scan_kvs[sort_key] = act_value;
+ i++;
+ }
+ ret = scanner->next(hash_key, sort_key, act_value);
+ ASSERT_EQ(ret, PERR_SCAN_COMPLETE);
+ ASSERT_EQ(expected_scan_count, scan_kvs.size());
+ delete scanner;
+
+ // compare scan result
+ for (auto it1 = kvs.begin(), it2 = scan_kvs.begin();; ++it1, ++it2) {
+ if (it1 == kvs.end()) {
+ ASSERT_EQ(it2, scan_kvs.end());
+ break;
+ }
+ ASSERT_NE(it2, scan_kvs.end());
+ ASSERT_EQ(*it1, *it2);
+ }
+ }
+
+public:
+ const std::string table = "temp";
+ const std::string hashkey = "range_read_hashkey";
+ const std::string sortkey_prefix = "1-";
+ const std::string value = "value";
+ const int32_t timeoutms = 5000;
+ pegasus_client *pg_client =
pegasus_client_factory::get_client("mycluster", table.c_str());
+ std::set<std::string> sortkeys;
+ std::map<std::string, std::string> kvs;
+};
+
+TEST_F(range_read_test, multiget_test)
+{
+ pegasus::pegasus_client::multi_get_options options;
+ std::map<std::string, std::string> new_values;
+ struct test_struct
+ {
+ int32_t expire_count;
+ int32_t total_count;
+ int32_t get_max_kv_count;
+ int32_t expected_error;
+ int32_t expected_value_count;
+ } tests[]{// total_count < max_kv_count <= max_iteration_count
+ {50, 50, 100, PERR_OK, 0},
+ {20, 50, 100, PERR_OK, 30},
+ {0, 50, 100, PERR_OK, 50},
+ // total_count > max_kv_count >= max_iteration
+ {3000, 4000, 3500, PERR_INCOMPLETE, 0},
+ {500, 4000, 3500, PERR_INCOMPLETE, 2500},
+ {0, 4000, 3500, PERR_INCOMPLETE, 3000},
+ // total_count > max_iteration_count > max_kv_count
+ {3000, 4000, 100, PERR_INCOMPLETE, 0},
+ {2950, 4000, 100, PERR_INCOMPLETE, 50},
+ {100, 4000, 100, PERR_INCOMPLETE, 100},
+ {20, 4000, 100, PERR_INCOMPLETE, 100},
+ {0, 4000, 100, PERR_INCOMPLETE, 100}};
+
+ for (auto test : tests) {
+ new_values.clear();
+ prepare(test.total_count, test.expire_count);
+ auto ret =
+ pg_client->multi_get(hashkey, "", "", options, new_values,
test.get_max_kv_count);
+ ASSERT_EQ(ret, test.expected_error);
+ ASSERT_EQ(new_values.size(), test.expected_value_count);
+ cleanup(test.total_count);
+ }
+}
+
+TEST_F(range_read_test, sortkeycount_test)
+{
+ int64_t count;
+ struct test_struct
+ {
+ int32_t expire_count;
+ int32_t total_count;
+ int32_t expected_error;
+ int64_t expected_count;
+ } tests[]{{0, 500, PERR_OK, 500}, {500, 4000, PERR_OK, 3500}};
+
+ for (auto test : tests) {
+ prepare(test.total_count, test.expire_count);
+ auto ret = pg_client->sortkey_count(hashkey, count);
+ ASSERT_EQ(ret, test.expected_error);
+ ASSERT_EQ(count, test.expected_count);
+ cleanup(test.total_count);
+ }
+}
+
+TEST_F(range_read_test, scan_test)
+{
+ struct test_struct
+ {
+ int32_t expire_count;
+ int32_t total_count;
+ int32_t batch_size;
+ int32_t expected_scan_count;
+ } tests[]{// total_count < max_kv_count <= max_iteration_count
+ {50, 50, 100, 0},
+ {20, 50, 100, 30},
+ {0, 50, 100, 50},
+ // total_count > max_kv_count >= max_iteration
+ {3000, 4000, 3500, 1000},
+ {500, 4000, 3500, 3500},
+ {0, 4000, 3500, 4000},
+ // total_count > max_iteration_count > max_kv_count
+ {3000, 4000, 100, 1000},
+ {2950, 4000, 100, 1050},
+ {100, 4000, 100, 3900},
+ {20, 4000, 100, 3980},
+ {0, 4000, 100, 4000}};
+
+ for (auto test : tests) {
+ prepare(test.total_count, test.expire_count);
+ test_scan(test.expire_count, test.total_count, test.batch_size,
test.expected_scan_count);
+ cleanup(test.total_count);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]