This is an automated email from the ASF dual-hosted git repository.

smityz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new c2e6ea2  feat: range read count enhancement (#811)
c2e6ea2 is described below

commit c2e6ea2ea2aaab6968753980e468f1dcdd8d37d5
Author: HeYuchen <[email protected]>
AuthorDate: Tue Sep 7 10:39:15 2021 +0800

    feat: range read count enhancement (#811)
---
 src/server/pegasus_server_impl.cpp         |  36 +++--
 src/test/function_test/run.sh              |   2 +
 src/test/function_test/test_basic.cpp      |   2 +-
 src/test/function_test/test_range_read.cpp | 203 +++++++++++++++++++++++++++++
 4 files changed, 228 insertions(+), 15 deletions(-)

diff --git a/src/server/pegasus_server_impl.cpp 
b/src/server/pegasus_server_impl.cpp
index 0779b6c..4e8cf69 100644
--- a/src/server/pegasus_server_impl.cpp
+++ b/src/server/pegasus_server_impl.cpp
@@ -365,9 +365,11 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc)
         return;
     }
 
-    uint32_t max_kv_count = request.max_kv_count > 0 ? request.max_kv_count : 
INT_MAX;
-    uint32_t max_iteration_count =
-        std::min(max_kv_count, _rng_rd_opts.multi_get_max_iteration_count);
+    uint32_t max_kv_count = _rng_rd_opts.multi_get_max_iteration_count;
+    uint32_t max_iteration_count = _rng_rd_opts.multi_get_max_iteration_count;
+    if (request.max_kv_count > 0 && request.max_kv_count < max_kv_count) {
+        max_kv_count = request.max_kv_count;
+    }
 
     int32_t max_kv_size = request.max_kv_size > 0 ? request.max_kv_size : 
INT_MAX;
     int32_t max_iteration_size_config = 
_rng_rd_opts.multi_get_max_iteration_size > 0
@@ -463,7 +465,7 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc)
             it.reset(_db->NewIterator(_data_cf_rd_opts, _data_cf));
             it->Seek(start);
             bool first_exclusive = !start_inclusive;
-            while (limiter->valid() && it->Valid()) {
+            while (count < max_kv_count && limiter->valid() && it->Valid()) {
                 // check stop sort key
                 int c = it->key().compare(stop);
                 if (c > 0 || (c == 0 && !stop_inclusive)) {
@@ -535,7 +537,7 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc)
             it->SeekForPrev(stop);
             bool first_exclusive = !stop_inclusive;
             std::vector<::dsn::apps::key_value> reverse_kvs;
-            while (limiter->valid() && it->Valid()) {
+            while (count < max_kv_count && limiter->valid() && it->Valid()) {
                 // check start sort key
                 int c = it->key().compare(start);
                 if (c < 0 || (c == 0 && !start_inclusive)) {
@@ -1006,16 +1008,20 @@ void 
pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)
     uint64_t filter_count = 0;
     int32_t count = 0;
 
-    uint32_t request_batch_size = request.batch_size > 0 ? request.batch_size 
: INT_MAX;
-    uint32_t batch_count = std::min(request_batch_size, 
_rng_rd_opts.rocksdb_max_iteration_count);
+    uint32_t batch_count = _rng_rd_opts.rocksdb_max_iteration_count;
+    if (request.batch_size > 0 && request.batch_size < batch_count) {
+        batch_count = request.batch_size;
+    }
     resp.kvs.reserve(batch_count);
 
     bool return_expire_ts = request.__isset.return_expire_ts ? 
request.return_expire_ts : false;
 
-    std::unique_ptr<range_read_limiter> limiter = 
dsn::make_unique<range_read_limiter>(
-        batch_count, 0, _rng_rd_opts.rocksdb_iteration_threshold_time_ms);
+    std::unique_ptr<range_read_limiter> limiter =
+        
dsn::make_unique<range_read_limiter>(_rng_rd_opts.rocksdb_max_iteration_count,
+                                             0,
+                                             
_rng_rd_opts.rocksdb_iteration_threshold_time_ms);
 
-    while (limiter->valid() && it->Valid()) {
+    while (count < batch_count && limiter->valid() && it->Valid()) {
         int c = it->key().compare(stop);
         if (c > 0 || (c == 0 && !stop_inclusive)) {
             // out of range
@@ -1178,14 +1184,15 @@ void pegasus_server_impl::on_scan(scan_rpc rpc)
         uint64_t filter_count = 0;
         int32_t count = 0;
 
-        uint32_t context_batch_size = context->batch_size > 0 ? 
context->batch_size : INT_MAX;
-        uint32_t batch_count =
-            std::min(context_batch_size, 
_rng_rd_opts.rocksdb_max_iteration_count);
+        uint32_t batch_count = _rng_rd_opts.rocksdb_max_iteration_count;
+        if (context->batch_size > 0 && context->batch_size < batch_count) {
+            batch_count = context->batch_size;
+        }
 
         std::unique_ptr<range_read_limiter> limiter = 
dsn::make_unique<range_read_limiter>(
             batch_count, 0, _rng_rd_opts.rocksdb_iteration_threshold_time_ms);
 
-        while (limiter->valid() && it->Valid()) {
+        while (count < batch_count && limiter->valid() && it->Valid()) {
             int c = it->key().compare(stop);
             if (c > 0 || (c == 0 && !stop_inclusive)) {
                 // out of range
@@ -2186,6 +2193,7 @@ range_iteration_state 
pegasus_server_impl::append_key_value_for_multi_get(
     ::dsn::blob raw_key(key.data(), 0, key.size());
     ::dsn::blob hash_key, sort_key;
     pegasus_restore_key(raw_key, hash_key, sort_key);
+
     if (sort_key_filter_type != ::dsn::apps::filter_type::FT_NO_FILTER &&
         !validate_filter(sort_key_filter_type, sort_key_filter_pattern, 
sort_key)) {
         if (_verbose_log) {
diff --git a/src/test/function_test/run.sh b/src/test/function_test/run.sh
index 260944e..f7abaa3 100755
--- a/src/test/function_test/run.sh
+++ b/src/test/function_test/run.sh
@@ -58,6 +58,8 @@ GTEST_OUTPUT="xml:$REPORT_DIR/check_and_mutate.xml" 
GTEST_FILTER="check_and_muta
 exit_if_fail $? "run test check_and_mutate failed: $test_case $config_file 
$table_name"
 GTEST_OUTPUT="xml:$REPORT_DIR/scan.xml" GTEST_FILTER="scan.*" ./$test_case 
$config_file $table_name
 exit_if_fail $? "run test scan failed: $test_case $config_file $table_name"
+GTEST_OUTPUT="xml:$REPORT_DIR/range_read.xml" GTEST_FILTER="range_read_test.*" 
./$test_case $config_file $table_name
+exit_if_fail $? "run test range_read failed: $test_case $config_file 
$table_name"
 GTEST_OUTPUT="xml:$REPORT_DIR/ttl.xml" GTEST_FILTER="ttl.*" ./$test_case 
$config_file $table_name
 exit_if_fail $? "run test ttl failed: $test_case $config_file $table_name"
 GTEST_OUTPUT="xml:$REPORT_DIR/slog_log.xml" GTEST_FILTER="lost_log.*" 
./$test_case $config_file $table_name
diff --git a/src/test/function_test/test_basic.cpp 
b/src/test/function_test/test_basic.cpp
index 592a6ce..d89edc8 100644
--- a/src/test/function_test/test_basic.cpp
+++ b/src/test/function_test/test_basic.cpp
@@ -568,7 +568,7 @@ TEST(basic, multi_get)
     new_values.clear();
     ret = client->multi_get("basic_test_multi_get", "", "", options, 
new_values, 2);
     ASSERT_EQ(PERR_INCOMPLETE, ret);
-    ASSERT_EQ(1, (int)new_values.size());
+    ASSERT_EQ(2, (int)new_values.size());
     ASSERT_EQ("1", new_values["1"]);
 
     // multi_del
diff --git a/src/test/function_test/test_range_read.cpp 
b/src/test/function_test/test_range_read.cpp
new file mode 100644
index 0000000..4fca842
--- /dev/null
+++ b/src/test/function_test/test_range_read.cpp
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <dsn/service_api_c.h>
+#include <gtest/gtest.h>
+#include <pegasus/client.h>
+#include <pegasus/error.h>
+
+using namespace ::dsn;
+using namespace pegasus;
+
+class range_read_test : public testing::Test
+{
+public:
+    void prepare(const int32_t total_count, const int32_t expire_count)
+    {
+        if (expire_count > 0) {
+            // set expire values
+            for (auto i = 0; i < expire_count; i++) {
+                std::string sort_key = "1-" + std::to_string(i);
+                sortkeys.insert(sort_key);
+                kvs[sort_key] = value;
+            }
+            auto ret = pg_client->multi_set(hashkey, kvs, timeoutms, 1);
+            ASSERT_EQ(PERR_OK, ret);
+            std::this_thread::sleep_for(std::chrono::seconds(1));
+            kvs.clear();
+        }
+
+        if (total_count > expire_count) {
+            // set normal values
+            for (auto i = expire_count; i < total_count; i++) {
+                std::string sort_key = "2-" + std::to_string(i);
+                sortkeys.insert(sort_key);
+                kvs[sort_key] = value;
+            }
+            auto ret = pg_client->multi_set(hashkey, kvs);
+            ASSERT_EQ(PERR_OK, ret);
+        }
+    }
+
+    void cleanup(const int32_t expected_deleted_count)
+    {
+        int64_t deleted_count;
+        auto ret = pg_client->multi_del(hashkey, sortkeys, deleted_count);
+        ASSERT_EQ(PERR_OK, ret);
+        ASSERT_EQ(deleted_count, expected_deleted_count);
+        sortkeys.clear();
+        kvs.clear();
+    }
+
+    void test_scan(const int32_t expire_count,
+                   const int32_t total_count,
+                   const int32_t batch_count,
+                   const int32_t expected_scan_count)
+    {
+        pegasus::pegasus_client::scan_options options;
+        options.batch_size = batch_count;
+        pegasus::pegasus_client::pegasus_scanner *scanner;
+        auto ret = pg_client->get_scanner(hashkey, "", "", options, scanner);
+        ASSERT_EQ(ret, PERR_OK);
+
+        std::map<std::string, std::string> scan_kvs;
+        std::string hash_key;
+        std::string sort_key;
+        std::string act_value;
+        auto i = expire_count;
+        while (i < total_count) {
+            ret = scanner->next(hash_key, sort_key, act_value);
+            ASSERT_EQ(ret, PERR_OK);
+            ASSERT_EQ(hash_key, hashkey);
+            scan_kvs[sort_key] = act_value;
+            i++;
+        }
+        ret = scanner->next(hash_key, sort_key, act_value);
+        ASSERT_EQ(ret, PERR_SCAN_COMPLETE);
+        ASSERT_EQ(expected_scan_count, scan_kvs.size());
+        delete scanner;
+
+        // compare scan result
+        for (auto it1 = kvs.begin(), it2 = scan_kvs.begin();; ++it1, ++it2) {
+            if (it1 == kvs.end()) {
+                ASSERT_EQ(it2, scan_kvs.end());
+                break;
+            }
+            ASSERT_NE(it2, scan_kvs.end());
+            ASSERT_EQ(*it1, *it2);
+        }
+    }
+
+public:
+    const std::string table = "temp";
+    const std::string hashkey = "range_read_hashkey";
+    const std::string sortkey_prefix = "1-";
+    const std::string value = "value";
+    const int32_t timeoutms = 5000;
+    pegasus_client *pg_client = 
pegasus_client_factory::get_client("mycluster", table.c_str());
+    std::set<std::string> sortkeys;
+    std::map<std::string, std::string> kvs;
+};
+
+TEST_F(range_read_test, multiget_test)
+{
+    pegasus::pegasus_client::multi_get_options options;
+    std::map<std::string, std::string> new_values;
+    struct test_struct
+    {
+        int32_t expire_count;
+        int32_t total_count;
+        int32_t get_max_kv_count;
+        int32_t expected_error;
+        int32_t expected_value_count;
+    } tests[]{// total_count < max_kv_count <= max_iteration_count
+              {50, 50, 100, PERR_OK, 0},
+              {20, 50, 100, PERR_OK, 30},
+              {0, 50, 100, PERR_OK, 50},
+              // total_count > max_kv_count >= max_iteration
+              {3000, 4000, 3500, PERR_INCOMPLETE, 0},
+              {500, 4000, 3500, PERR_INCOMPLETE, 2500},
+              {0, 4000, 3500, PERR_INCOMPLETE, 3000},
+              //  total_count > max_iteration_count > max_kv_count
+              {3000, 4000, 100, PERR_INCOMPLETE, 0},
+              {2950, 4000, 100, PERR_INCOMPLETE, 50},
+              {100, 4000, 100, PERR_INCOMPLETE, 100},
+              {20, 4000, 100, PERR_INCOMPLETE, 100},
+              {0, 4000, 100, PERR_INCOMPLETE, 100}};
+
+    for (auto test : tests) {
+        new_values.clear();
+        prepare(test.total_count, test.expire_count);
+        auto ret =
+            pg_client->multi_get(hashkey, "", "", options, new_values, 
test.get_max_kv_count);
+        ASSERT_EQ(ret, test.expected_error);
+        ASSERT_EQ(new_values.size(), test.expected_value_count);
+        cleanup(test.total_count);
+    }
+}
+
+TEST_F(range_read_test, sortkeycount_test)
+{
+    int64_t count;
+    struct test_struct
+    {
+        int32_t expire_count;
+        int32_t total_count;
+        int32_t expected_error;
+        int64_t expected_count;
+    } tests[]{{0, 500, PERR_OK, 500}, {500, 4000, PERR_OK, 3500}};
+
+    for (auto test : tests) {
+        prepare(test.total_count, test.expire_count);
+        auto ret = pg_client->sortkey_count(hashkey, count);
+        ASSERT_EQ(ret, test.expected_error);
+        ASSERT_EQ(count, test.expected_count);
+        cleanup(test.total_count);
+    }
+}
+
+TEST_F(range_read_test, scan_test)
+{
+    struct test_struct
+    {
+        int32_t expire_count;
+        int32_t total_count;
+        int32_t batch_size;
+        int32_t expected_scan_count;
+    } tests[]{// total_count < max_kv_count <= max_iteration_count
+              {50, 50, 100, 0},
+              {20, 50, 100, 30},
+              {0, 50, 100, 50},
+              // total_count > max_kv_count >= max_iteration
+              {3000, 4000, 3500, 1000},
+              {500, 4000, 3500, 3500},
+              {0, 4000, 3500, 4000},
+              //  total_count > max_iteration_count > max_kv_count
+              {3000, 4000, 100, 1000},
+              {2950, 4000, 100, 1050},
+              {100, 4000, 100, 3900},
+              {20, 4000, 100, 3980},
+              {0, 4000, 100, 4000}};
+
+    for (auto test : tests) {
+        prepare(test.total_count, test.expire_count);
+        test_scan(test.expire_count, test.total_count, test.batch_size, 
test.expected_scan_count);
+        cleanup(test.total_count);
+    }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to