This is an automated email from the ASF dual-hosted git repository.

zhaoliwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 46281da  feat(split): add split function test (#733)
46281da is described below

commit 46281dad19ffb8b6da94ab5b460ed87f386395ce
Author: HeYuchen <[email protected]>
AuthorDate: Wed Jun 30 18:03:35 2021 +0800

    feat(split): add split function test (#733)
---
 src/test/function_test/run.sh         |   2 +
 src/test/function_test/test_split.cpp | 355 ++++++++++++++++++++++++++++++++++
 2 files changed, 357 insertions(+)

diff --git a/src/test/function_test/run.sh b/src/test/function_test/run.sh
index f0deb5a..260944e 100755
--- a/src/test/function_test/run.sh
+++ b/src/test/function_test/run.sh
@@ -75,4 +75,6 @@ if [ $on_travis == "NO" ]; then
     exit_if_fail $? "run test test_detect_hotspot load failed: $test_case 
$config_file $table_name"
     GTEST_OUTPUT="xml:$REPORT_DIR/backup_restore_test.xml" 
GTEST_FILTER="backup_restore_test.*" ./$test_case $config_file $table_name
     exit_if_fail $? "run test backup_restore_test load failed: $test_case 
$config_file $table_name"
+    GTEST_OUTPUT="xml:$REPORT_DIR/split.xml" 
GTEST_FILTER="partition_split_test.*" ./$test_case $config_file $table_name
+    exit_if_fail $? "run test split failed: $test_case $config_file 
$table_name"
 fi
diff --git a/src/test/function_test/test_split.cpp 
b/src/test/function_test/test_split.cpp
new file mode 100644
index 0000000..3ce27cb
--- /dev/null
+++ b/src/test/function_test/test_split.cpp
@@ -0,0 +1,355 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+#include <pegasus/client.h>
+#include <boost/lexical_cast.hpp>
+
+#include <dsn/dist/replication/replication_ddl_client.h>
+
+#include "base/pegasus_const.h"
+
+using namespace dsn;
+using namespace dsn::replication;
+using namespace pegasus;
+
+class partition_split_test : public testing::Test
+{
+public:
+    void prepare(const std::string &table_name)
+    {
+        std::vector<rpc_address> meta_list;
+        replica_helper::load_meta_servers(
+            meta_list, PEGASUS_CLUSTER_SECTION_NAME.c_str(), "mycluster");
+
+        ddl_client = std::make_shared<replication_ddl_client>(meta_list);
+        create_table(table_name);
+
+        pg_client = pegasus_client_factory::get_client("mycluster", 
table_name.c_str());
+        write_data_before_split();
+
+        start_partition_split(table_name);
+    }
+
+    void cleanup(const std::string &table_name)
+    {
+        count_during_split = 0;
+        expected.clear();
+        drop_table(table_name);
+        delete pg_client;
+    }
+
+    void create_table(const std::string &table_name)
+    {
+        error_code error =
+            ddl_client->create_app(table_name, "pegasus", partition_count, 3, 
{}, false);
+        ASSERT_EQ(ERR_OK, error);
+    }
+
+    void drop_table(const std::string &table_name)
+    {
+        error_code error = ddl_client->drop_app(table_name, 1);
+        ASSERT_EQ(ERR_OK, error);
+    }
+
+    void start_partition_split(const std::string &table_name)
+    {
+        auto err_resp = ddl_client->start_partition_split(table_name, 
partition_count * 2);
+        ASSERT_EQ(ERR_OK, err_resp.get_value().err);
+        std::cout << "Table(" << table_name << ") start partition split 
succeed" << std::endl;
+    }
+
+    bool is_split_finished(const std::string &table_name)
+    {
+        auto err_resp = ddl_client->query_partition_split(table_name);
+        return err_resp.get_value().err == ERR_INVALID_STATE;
+    }
+
+    bool is_single_partition(int32_t target_pidx) { return target_pidx > 0; }
+
+    bool check_partition_split_status(const std::string &table_name,
+                                      int32_t target_pidx,
+                                      split_status::type target_status)
+    {
+        auto err_resp = ddl_client->query_partition_split(table_name);
+        auto status_map = err_resp.get_value().status;
+        if (is_single_partition(target_pidx)) {
+            return (status_map.find(target_pidx) != status_map.end() &&
+                    status_map[target_pidx] == target_status);
+        }
+
+        // all partitions
+        int32_t finish_count = 0;
+        for (auto i = 0; i < partition_count; ++i) {
+            if (status_map.find(i) != status_map.end() && status_map[i] == 
target_status) {
+                finish_count++;
+            }
+        }
+        return finish_count == partition_count;
+    }
+
+    error_code control_partition_split(const std::string &table_name,
+                                       split_control_type::type type,
+                                       int32_t parent_pidx,
+                                       int32_t old_partition_count = 0)
+    {
+        auto err_resp =
+            ddl_client->control_partition_split(table_name, type, parent_pidx, 
old_partition_count);
+        return err_resp.get_value().err;
+    }
+
+    void write_data_before_split()
+    {
+        std::cout << "Write data before partition split......" << std::endl;
+        for (int32_t i = 0; i < dataset_count; ++i) {
+            std::string hash_key = dataset_hashkey_prefix + std::to_string(i);
+            std::string sort_key = dataset_sortkey_prefix + std::to_string(i);
+            auto ret = pg_client->set(hash_key, sort_key, data_value);
+            ASSERT_EQ(ret, PERR_OK);
+            expected[hash_key][sort_key] = data_value;
+        }
+    }
+
+    void write_data_during_split()
+    {
+        std::string hash = splitting_hashkey_prefix + 
std::to_string(count_during_split);
+        std::string sort = splitting_sortkey_prefix + 
std::to_string(count_during_split);
+        auto ret = pg_client->set(hash, sort, data_value);
+        ASSERT_TRUE((ret == PERR_OK || ret == PERR_TIMEOUT));
+        if (ret == PERR_OK) {
+            expected[hash][sort] = data_value;
+            count_during_split++;
+        }
+    }
+
+    void read_data_during_split()
+    {
+        auto index = rand() % dataset_count;
+        std::string hash = dataset_hashkey_prefix + std::to_string(index);
+        std::string sort = dataset_sortkey_prefix + std::to_string(index);
+        std::string expected_value;
+        auto ret = pg_client->get(hash, sort, expected_value);
+        ASSERT_TRUE((ret == PERR_OK || ret == PERR_TIMEOUT));
+        if (ret == PERR_OK) {
+            ASSERT_EQ(expected_value, data_value);
+        }
+    }
+
+    void verify_data_after_split()
+    {
+        std::cout << "Verify data(count=" << dataset_count + count_during_split
+                  << ") after partition split......" << std::endl;
+        verify_data(dataset_hashkey_prefix, dataset_sortkey_prefix, 
dataset_count);
+        verify_data(splitting_hashkey_prefix, splitting_sortkey_prefix, 
count_during_split);
+    }
+
+    void verify_data(const std::string &hashkey_prefix,
+                     const std::string &sortkey_prefix,
+                     const int32_t count)
+    {
+        for (auto i = 0; i < count; ++i) {
+            std::string hash_key = hashkey_prefix + std::to_string(i);
+            std::string sort_key = sortkey_prefix + std::to_string(i);
+            std::string expected_value;
+            auto ret = pg_client->get(hash_key, sort_key, expected_value);
+            ASSERT_EQ(ret, PERR_OK);
+            ASSERT_EQ(expected[hash_key][sort_key], expected_value);
+        }
+    }
+
+    void hash_scan_during_split(int32_t count)
+    {
+        if (count % 3 != 0) {
+            return;
+        }
+
+        pegasus_client::pegasus_scanner *scanner = nullptr;
+        pegasus_client::scan_options options;
+        auto ret = pg_client->get_scanner(
+            dataset_hashkey_prefix + std::to_string(count), "", "", options, 
scanner);
+        ASSERT_TRUE((ret == PERR_OK || ret == PERR_TIMEOUT));
+        if (ret == PERR_OK) {
+            std::string hash_key;
+            std::string sort_key;
+            std::string expected_value;
+            while (scanner->next(hash_key, sort_key, expected_value) == 0) {
+                ASSERT_EQ(expected[hash_key][sort_key], expected_value);
+            }
+        }
+        delete scanner;
+    }
+
+    void full_scan_after_split()
+    {
+        std::cout << "Start full scan after partition split......" << 
std::endl;
+        std::vector<pegasus_client::pegasus_scanner *> scanners;
+        pegasus_client::scan_options options;
+        auto ret = pg_client->get_unordered_scanners(10000, options, scanners);
+        ASSERT_EQ(ret, PERR_OK);
+        int32_t count;
+        for (auto i = 0; i < scanners.size(); i++) {
+            std::string hash_key;
+            std::string sort_key;
+            std::string expected_value;
+            pegasus_client::internal_info info;
+            pegasus_client::pegasus_scanner *scanner = scanners[i];
+            while (scanner->next(hash_key, sort_key, expected_value, &info) == 
0) {
+                ASSERT_EQ(expected[hash_key][sort_key], expected_value);
+                count++;
+            }
+        }
+        ASSERT_EQ(count, dataset_count);
+        for (auto scanner : scanners) {
+            delete scanner;
+        }
+    }
+
+public:
+    std::shared_ptr<replication_ddl_client> ddl_client;
+    pegasus_client *pg_client;
+    std::unordered_map<std::string, std::unordered_map<std::string, 
std::string>> expected;
+    const int32_t partition_count = 4;
+    const int32_t dataset_count = 1000;
+    const std::string dataset_hashkey_prefix = "hashkey";
+    const std::string dataset_sortkey_prefix = "sortkey";
+    const std::string splitting_hashkey_prefix = "keyh_";
+    const std::string splitting_sortkey_prefix = "keys_";
+    const std::string data_value = "vaule";
+    int32_t count_during_split = 0;
+};
+
+TEST_F(partition_split_test, split_with_write)
+{
+    const std::string table_name = "split_table_1";
+    prepare(table_name);
+
+    // write data during partition split
+    do {
+        write_data_during_split();
+        std::this_thread::sleep_for(std::chrono::seconds(1));
+    } while (!is_split_finished(table_name));
+    std::cout << "Partition split succeed" << std::endl;
+
+    verify_data_after_split();
+    cleanup(table_name);
+}
+
+TEST_F(partition_split_test, split_with_read)
+{
+    const std::string table_name = "split_table_2";
+    prepare(table_name);
+
+    // read data during partition split
+    do {
+        read_data_during_split();
+        std::this_thread::sleep_for(std::chrono::seconds(1));
+    } while (!is_split_finished(table_name));
+    std::cout << "Partition split succeed" << std::endl;
+    verify_data_after_split();
+    cleanup(table_name);
+}
+
+TEST_F(partition_split_test, split_with_scan)
+{
+    const std::string table_name = "split_table_3";
+    prepare(table_name);
+
+    int32_t count = 0;
+    do {
+        hash_scan_during_split(count);
+        std::this_thread::sleep_for(std::chrono::seconds(1));
+        ++count;
+    } while (!is_split_finished(table_name));
+    std::cout << "Partition split succeed" << std::endl;
+    verify_data_after_split();
+    std::this_thread::sleep_for(std::chrono::seconds(30));
+    full_scan_after_split();
+    cleanup(table_name);
+}
+
+TEST_F(partition_split_test, pause_split)
+{
+    const std::string table_name = "split_table_4";
+    prepare(table_name);
+
+    bool already_pause = false, already_restart = false;
+    int32_t target_partition = 2, count = 30;
+    do {
+        write_data_during_split();
+        // pause target partition split
+        if (!already_pause &&
+            check_partition_split_status(table_name, -1, 
split_status::SPLITTING)) {
+            error_code error =
+                control_partition_split(table_name, split_control_type::PAUSE, 
target_partition);
+            ASSERT_EQ(ERR_OK, error);
+            std::cout << "Table(" << table_name << ") pause partition[" << 
target_partition
+                      << "] split succeed" << std::endl;
+            already_pause = true;
+        }
+        // restart target partition split
+        if (!already_restart && count_during_split >= count &&
+            check_partition_split_status(table_name, target_partition, 
split_status::PAUSED)) {
+            error_code error =
+                control_partition_split(table_name, 
split_control_type::RESTART, target_partition);
+            ASSERT_EQ(ERR_OK, error);
+            std::cout << "Table(" << table_name << ") restart split 
partition[" << target_partition
+                      << "] succeed" << std::endl;
+            already_restart = true;
+        }
+        std::this_thread::sleep_for(std::chrono::seconds(1));
+    } while (!is_split_finished(table_name));
+    std::cout << "Partition split succeed" << std::endl;
+
+    verify_data_after_split();
+    cleanup(table_name);
+}
+
+TEST_F(partition_split_test, cancel_split)
+{
+    const std::string table_name = "split_table_5";
+    prepare(table_name);
+
+    // pause partition split
+    bool already_pause = false;
+    error_code error = ERR_OK;
+    do {
+        write_data_during_split();
+        // pause all partition split
+        if (!already_pause &&
+            check_partition_split_status(table_name, -1, 
split_status::SPLITTING)) {
+            error = control_partition_split(table_name, 
split_control_type::PAUSE, -1);
+            ASSERT_EQ(ERR_OK, error);
+            std::cout << "Table(" << table_name << ") pause all partitions 
split succeed"
+                      << std::endl;
+            already_pause = true;
+        }
+        std::this_thread::sleep_for(std::chrono::seconds(1));
+    } while (!check_partition_split_status(table_name, -1, 
split_status::PAUSED));
+
+    // cancel partition split
+    error = control_partition_split(table_name, split_control_type::CANCEL, 
-1, partition_count);
+    ASSERT_EQ(ERR_OK, error);
+    std::cout << "Table(" << table_name << ") cancel partitions split succeed" 
<< std::endl;
+    // write data during cancel partition split
+    do {
+        write_data_during_split();
+        std::this_thread::sleep_for(std::chrono::seconds(1));
+    } while (!is_split_finished(table_name));
+
+    verify_data_after_split();
+    cleanup(table_name);
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to