This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 42136fa4b refactor(test): refactor bulk load function test (#1616)
42136fa4b is described below

commit 42136fa4b36288b7597c32c93043541f18b3bba7
Author: Yingchun Lai <[email protected]>
AuthorDate: Thu Sep 21 10:10:58 2023 +0800

    refactor(test): refactor bulk load function test (#1616)
    
    https://github.com/apache/incubator-pegasus/issues/887
    
    There is no functional changes, but only refactor the bulk_load function 
test.
---
 src/block_service/local/local_service.cpp          |  12 +-
 src/block_service/local/local_service.h            |  10 +
 src/block_service/test/local_service_test.cpp      |   1 +
 src/meta/meta_bulk_load_service.h                  |   4 +
 src/test/function_test/bulk_load/CMakeLists.txt    |   3 +-
 .../function_test/bulk_load/test_bulk_load.cpp     | 341 +++++++++++----------
 6 files changed, 205 insertions(+), 166 deletions(-)

diff --git a/src/block_service/local/local_service.cpp 
b/src/block_service/local/local_service.cpp
index 5eb4b3fba..04cf1cc3d 100644
--- a/src/block_service/local/local_service.cpp
+++ b/src/block_service/local/local_service.cpp
@@ -16,17 +16,16 @@
 // under the License.
 
 #include <errno.h>
-#include <nlohmann/json.hpp>
 #include <algorithm>
-#include <fstream>
 #include <initializer_list>
+#include <istream>
 #include <memory>
 #include <set>
 #include <type_traits>
 #include <utility>
 
 #include "local_service.h"
-#include "nlohmann/detail/macro_scope.hpp"
+#include "nlohmann/json.hpp"
 #include "nlohmann/json_fwd.hpp"
 #include "runtime/task/async_calls.h"
 #include "utils/autoref_ptr.h"
@@ -52,13 +51,6 @@ namespace block_service {
 
 DEFINE_TASK_CODE(LPC_LOCAL_SERVICE_CALL, TASK_PRIORITY_COMMON, 
THREAD_POOL_BLOCK_SERVICE)
 
-struct file_metadata
-{
-    uint64_t size;
-    std::string md5;
-};
-NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(file_metadata, size, md5)
-
 bool file_metadata_from_json(std::ifstream &fin, file_metadata &fmeta) noexcept
 {
     std::string data;
diff --git a/src/block_service/local/local_service.h 
b/src/block_service/local/local_service.h
index 9816734cf..9c944e1d0 100644
--- a/src/block_service/local/local_service.h
+++ b/src/block_service/local/local_service.h
@@ -17,6 +17,9 @@
 
 #pragma once
 
+#include <nlohmann/detail/macro_scope.hpp>
+#include <nlohmann/json.hpp>     // IWYU pragma: keep
+#include <nlohmann/json_fwd.hpp> // IWYU pragma: keep
 #include <stdint.h>
 #include <string>
 #include <vector>
@@ -32,6 +35,13 @@ class task_tracker;
 namespace dist {
 namespace block_service {
 
+struct file_metadata
+{
+    int64_t size = 0;
+    std::string md5;
+};
+NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(file_metadata, size, md5)
+
 class local_service : public block_filesystem
 {
 public:
diff --git a/src/block_service/test/local_service_test.cpp 
b/src/block_service/test/local_service_test.cpp
index e355a1b28..72a201237 100644
--- a/src/block_service/test/local_service_test.cpp
+++ b/src/block_service/test/local_service_test.cpp
@@ -26,6 +26,7 @@
 #include <nlohmann/json_fwd.hpp>
 #include <fstream>
 #include <initializer_list>
+#include <stdexcept>
 #include <vector>
 
 #include "block_service/local/local_service.h"
diff --git a/src/meta/meta_bulk_load_service.h 
b/src/meta/meta_bulk_load_service.h
index ba151757a..c411d87f4 100644
--- a/src/meta/meta_bulk_load_service.h
+++ b/src/meta/meta_bulk_load_service.h
@@ -97,6 +97,10 @@ struct bulk_load_info
     int32_t app_id;
     std::string app_name;
     int32_t partition_count;
+    bulk_load_info(int32_t id = 0, const std::string &name = "", int32_t 
pcount = 0)
+        : app_id(id), app_name(name), partition_count(pcount)
+    {
+    }
     DEFINE_JSON_SERIALIZATION(app_id, app_name, partition_count)
 };
 
diff --git a/src/test/function_test/bulk_load/CMakeLists.txt 
b/src/test/function_test/bulk_load/CMakeLists.txt
index 92a677ddf..a6eac0851 100644
--- a/src/test/function_test/bulk_load/CMakeLists.txt
+++ b/src/test/function_test/bulk_load/CMakeLists.txt
@@ -37,7 +37,8 @@ set(MY_PROJ_LIBS
     gssapi_krb5
     krb5
     function_test_utils
-    )
+    rocksdb
+    test_utils)
 
 set(MY_BOOST_LIBS Boost::system Boost::filesystem Boost::regex)
 
diff --git a/src/test/function_test/bulk_load/test_bulk_load.cpp 
b/src/test/function_test/bulk_load/test_bulk_load.cpp
index f350c6945..28e52858f 100644
--- a/src/test/function_test/bulk_load/test_bulk_load.cpp
+++ b/src/test/function_test/bulk_load/test_bulk_load.cpp
@@ -15,9 +15,15 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <fmt/core.h>
 // IWYU pragma: no_include <gtest/gtest-message.h>
 // IWYU pragma: no_include <gtest/gtest-test-part.h>
 #include <gtest/gtest.h>
+#include <nlohmann/json.hpp>
+#include <nlohmann/json_fwd.hpp>
+#include <rocksdb/env.h>
+#include <rocksdb/slice.h>
+#include <rocksdb/status.h>
 #include <algorithm>
 #include <chrono>
 #include <cstdint>
@@ -26,20 +32,24 @@
 #include <memory>
 #include <string>
 #include <thread>
-#include <vector>
 
 #include "base/pegasus_const.h"
+#include "block_service/local/local_service.h"
 #include "bulk_load_types.h"
+#include "client/partition_resolver.h"
 #include "client/replication_ddl_client.h"
+#include "common/json_helper.h"
 #include "include/pegasus/client.h"
 #include "include/pegasus/error.h"
+#include "meta/meta_bulk_load_service.h"
 #include "meta_admin_types.h"
-#include "metadata_types.h"
 #include "test/function_test/utils/test_util.h"
+#include "utils/blob.h"
+#include "utils/enum_helper.h"
 #include "utils/error_code.h"
 #include "utils/errors.h"
 #include "utils/filesystem.h"
-#include "utils/utils.h"
+#include "utils/test_macros.h"
 
 using namespace ::dsn;
 using namespace ::dsn::replication;
@@ -55,12 +65,12 @@ using std::string;
 ///  - `bulk_load_root` sub-directory stores right data
 ///     - Please do not rename any files or directories under this folder
 ///
-/// The app who is executing bulk load:
-/// - app_name is `temp`, app_id is 2, partition_count is 8
+/// The app to test bulk load functionality:
+/// - partition count should be 8
 ///
 /// Data:
-/// hashkey: hashi sortkey: sorti value: newValue       i=[0, 1000]
-/// hashkey: hashkeyj sortkey: sortkeyj value: newValue j=[0, 1000]
+/// hashkey: hash${i} sortkey: sort${i} value: newValue       i=[0, 1000]
+/// hashkey: hashkey${j} sortkey: sortkey${j} value: newValue j=[0, 1000]
 ///
 class bulk_load_test : public test_util
 {
@@ -68,80 +78,102 @@ protected:
     bulk_load_test() : test_util(map<string, 
string>({{"rocksdb.allow_ingest_behind", "true"}}))
     {
         TRICKY_CODE_TO_AVOID_LINK_ERROR;
-        bulk_load_local_root_ =
-            
utils::filesystem::path_combine("onebox/block_service/local_service/", 
LOCAL_ROOT);
+        bulk_load_local_app_root_ =
+            fmt::format("{}/{}/{}", kLocalBulkLoadRoot, kCluster, app_name_);
     }
 
     void SetUp() override
     {
         test_util::SetUp();
-        ASSERT_NO_FATAL_FAILURE(copy_bulk_load_files());
+        NO_FATALS(copy_bulk_load_files());
     }
 
     void TearDown() override
     {
         ASSERT_EQ(ERR_OK, ddl_client_->drop_app(app_name_, 0));
-        ASSERT_NO_FATAL_FAILURE(run_cmd_from_project_root("rm -rf 
onebox/block_service"));
+        NO_FATALS(run_cmd_from_project_root("rm -rf " + kLocalBulkLoadRoot));
     }
 
-    void copy_bulk_load_files()
+    // Generate the 'bulk_load_info' file according to 'bli' to path 
'bulk_load_info_path'.
+    void generate_bulk_load_info(const bulk_load_info &bli, const std::string 
&bulk_load_info_path)
     {
-        ASSERT_NO_FATAL_FAILURE(run_cmd_from_project_root("mkdir -p 
onebox/block_service"));
-        ASSERT_NO_FATAL_FAILURE(
-            run_cmd_from_project_root("mkdir -p 
onebox/block_service/local_service"));
-        ASSERT_NO_FATAL_FAILURE(run_cmd_from_project_root(
-            "cp -r 
src/test/function_test/bulk_load/pegasus-bulk-load-function-test-files/" +
-            LOCAL_ROOT + " onebox/block_service/local_service"));
-        string cmd = "echo '{\"app_id\":" + std::to_string(app_id_) +
-                     ",\"app_name\":\"temp\",\"partition_count\":8}' > "
-                     
"onebox/block_service/local_service/bulk_load_root/cluster/temp/"
-                     "bulk_load_info";
-        ASSERT_NO_FATAL_FAILURE(run_cmd_from_project_root(cmd));
+        auto value = dsn::json::json_forwarder<bulk_load_info>::encode(bli);
+        auto s = rocksdb::WriteStringToFile(rocksdb::Env::Default(),
+                                            rocksdb::Slice(value.data(), 
value.length()),
+                                            bulk_load_info_path,
+                                            /* should_sync */ true);
+        ASSERT_TRUE(s.ok()) << s.ToString();
     }
 
-    error_code start_bulk_load(bool ingest_behind = false)
+    // Generate the '.bulk_load_info.meta' file according to the 
'bulk_load_info' file
+    // in path 'bulk_load_info_path'.
+    void generate_bulk_load_info_meta(const std::string &bulk_load_info_path)
     {
-        auto err_resp =
-            ddl_client_->start_bulk_load(app_name_, CLUSTER, PROVIDER, 
LOCAL_ROOT, ingest_behind);
-        return err_resp.get_value().err;
+        dist::block_service::file_metadata fm;
+        ASSERT_TRUE(utils::filesystem::file_size(bulk_load_info_path, 
fm.size));
+        ASSERT_EQ(ERR_OK, utils::filesystem::md5sum(bulk_load_info_path, 
fm.md5));
+        std::string value = nlohmann::json(fm).dump();
+        auto bulk_load_info_meta_path =
+            fmt::format("{}/{}/{}/.bulk_load_info.meta", kLocalBulkLoadRoot, 
kCluster, app_name_);
+        auto s = rocksdb::WriteStringToFile(rocksdb::Env::Default(),
+                                            rocksdb::Slice(value),
+                                            bulk_load_info_meta_path,
+                                            /* should_sync */ true);
+        ASSERT_TRUE(s.ok()) << s.ToString();
     }
 
-    void remove_file(const string &file_path)
+    void copy_bulk_load_files()
+    {
+        // TODO(yingchun): remove the 'mock_bulk_load_info' file, because we 
can generate it.
+        // Prepare bulk load files.
+        // The source data has 8 partitions.
+        ASSERT_EQ(8, partition_count_);
+        NO_FATALS(run_cmd_from_project_root("mkdir -p " + kLocalBulkLoadRoot));
+        NO_FATALS(run_cmd_from_project_root(
+            fmt::format("cp -r {}/{} {}", kSourceFilesRoot, kBulkLoad, 
kLocalServiceRoot)));
+
+        // Generate 'bulk_load_info'.
+        auto bulk_load_info_path =
+            fmt::format("{}/{}/{}/bulk_load_info", kLocalBulkLoadRoot, 
kCluster, app_name_);
+        NO_FATALS(generate_bulk_load_info(bulk_load_info(app_id_, app_name_, 
partition_count_),
+                                          bulk_load_info_path));
+
+        // Generate '.bulk_load_info.meta'.
+        NO_FATALS(generate_bulk_load_info_meta(bulk_load_info_path));
+    }
+
+    error_code start_bulk_load(bool ingest_behind = false)
     {
-        ASSERT_NO_FATAL_FAILURE(run_cmd_from_project_root("rm " + file_path));
+        return ddl_client_
+            ->start_bulk_load(app_name_, kCluster, kProvider, kBulkLoad, 
ingest_behind)
+            .get_value()
+            .err;
     }
 
-    void replace_bulk_load_info()
+    void remove_file(const string &file_path)
     {
-        string cmd = "cp -R "
-                     
"src/test/function_test/bulk_load/pegasus-bulk-load-function-test-files/"
-                     "mock_bulk_load_info/. " +
-                     bulk_load_local_root_ + "/" + CLUSTER + "/" + app_name_ + 
"/";
-        ASSERT_NO_FATAL_FAILURE(run_cmd_from_project_root(cmd));
+        NO_FATALS(run_cmd_from_project_root("rm " + file_path));
     }
 
     void update_allow_ingest_behind(const string &allow_ingest_behind)
     {
-        // update app envs
-        std::vector<string> keys;
-        keys.emplace_back(ROCKSDB_ALLOW_INGEST_BEHIND);
-        std::vector<string> values;
-        values.emplace_back(allow_ingest_behind);
-        ASSERT_EQ(ERR_OK, ddl_client_->set_app_envs(app_name_, keys, 
values).get_value().err);
+        const auto ret = ddl_client_->set_app_envs(
+            app_name_, {ROCKSDB_ALLOW_INGEST_BEHIND}, {allow_ingest_behind});
+        ASSERT_EQ(ERR_OK, ret.get_value().err);
         std::cout << "sleep 31s to wait app_envs update" << std::endl;
         std::this_thread::sleep_for(std::chrono::seconds(31));
     }
 
-    bulk_load_status::type wait_bulk_load_finish(int64_t seconds)
+    bulk_load_status::type wait_bulk_load_finish(int64_t remain_seconds)
     {
         int64_t sleep_time = 5;
-        error_code err = ERR_OK;
+        auto err = ERR_OK;
 
-        bulk_load_status::type last_status = bulk_load_status::BLS_INVALID;
+        auto last_status = bulk_load_status::BLS_INVALID;
         // when bulk load end, err will be ERR_INVALID_STATE
-        while (seconds > 0 && err == ERR_OK) {
-            sleep_time = sleep_time > seconds ? seconds : sleep_time;
-            seconds -= sleep_time;
+        while (remain_seconds > 0 && err == ERR_OK) {
+            sleep_time = std::min(sleep_time, remain_seconds);
+            remain_seconds -= sleep_time;
             std::cout << "sleep " << sleep_time << "s to query bulk status" << 
std::endl;
             std::this_thread::sleep_for(std::chrono::seconds(sleep_time));
 
@@ -156,52 +188,51 @@ protected:
 
     void verify_bulk_load_data()
     {
-        ASSERT_NO_FATAL_FAILURE(verify_data("hashkey", "sortkey"));
-        ASSERT_NO_FATAL_FAILURE(verify_data(HASHKEY_PREFIX, SORTKEY_PREFIX));
+        NO_FATALS(verify_data(kBulkLoadHashKeyPrefix1, 
kBulkLoadSortKeyPrefix1));
+        NO_FATALS(verify_data(kBulkLoadHashKeyPrefix2, 
kBulkLoadSortKeyPrefix2));
     }
 
     void verify_data(const string &hashkey_prefix, const string 
&sortkey_prefix)
     {
-        const string &expected_value = VALUE;
-        for (int i = 0; i < COUNT; ++i) {
+        for (int i = 0; i < kBulkLoadItemCount; ++i) {
             string hash_key = hashkey_prefix + std::to_string(i);
-            for (int j = 0; j < COUNT; ++j) {
+            for (int j = 0; j < kBulkLoadItemCount; ++j) {
                 string sort_key = sortkey_prefix + std::to_string(j);
-                string act_value;
-                ASSERT_EQ(PERR_OK, client_->get(hash_key, sort_key, 
act_value)) << hash_key << ","
-                                                                               
 << sort_key;
-                ASSERT_EQ(expected_value, act_value) << hash_key << "," << 
sort_key;
+                string actual_value;
+                ASSERT_EQ(PERR_OK, client_->get(hash_key, sort_key, 
actual_value))
+                    << hash_key << "," << sort_key;
+                ASSERT_EQ(kBulkLoadValue, actual_value) << hash_key << "," << 
sort_key;
             }
         }
     }
 
-    enum operation
+    enum class operation
     {
         GET,
         SET,
         DEL,
         NO_VALUE
     };
-    void operate_data(bulk_load_test::operation op, const string &value, int 
count)
+    void operate_data(operation op, const string &value, int count)
     {
         for (int i = 0; i < count; ++i) {
-            string hash_key = HASHKEY_PREFIX + std::to_string(i);
-            string sort_key = SORTKEY_PREFIX + std::to_string(i);
+            auto hash_key = fmt::format("{}{}", kBulkLoadHashKeyPrefix2, i);
+            auto sort_key = fmt::format("{}{}", kBulkLoadSortKeyPrefix2, i);
             switch (op) {
-            case bulk_load_test::operation::GET: {
-                string act_value;
-                ASSERT_EQ(PERR_OK, client_->get(hash_key, sort_key, 
act_value));
-                ASSERT_EQ(value, act_value);
+            case operation::GET: {
+                string actual_value;
+                ASSERT_EQ(PERR_OK, client_->get(hash_key, sort_key, 
actual_value));
+                ASSERT_EQ(value, actual_value);
             } break;
-            case bulk_load_test::operation::DEL: {
+            case operation::DEL: {
                 ASSERT_EQ(PERR_OK, client_->del(hash_key, sort_key));
             } break;
-            case bulk_load_test::operation::SET: {
+            case operation::SET: {
                 ASSERT_EQ(PERR_OK, client_->set(hash_key, sort_key, value));
             } break;
-            case bulk_load_test::operation::NO_VALUE: {
-                string act_value;
-                ASSERT_EQ(PERR_NOT_FOUND, client_->get(hash_key, sort_key, 
act_value));
+            case operation::NO_VALUE: {
+                string actual_value;
+                ASSERT_EQ(PERR_NOT_FOUND, client_->get(hash_key, sort_key, 
actual_value));
             } break;
             default:
                 ASSERT_TRUE(false);
@@ -210,108 +241,108 @@ protected:
         }
     }
 
-protected:
-    string bulk_load_local_root_;
+    void check_bulk_load(bool ingest_behind,
+                         const std::string &value_before_bulk_load,
+                         const std::string &value_after_bulk_load)
+    {
+        // Write some data before bulk load.
+        NO_FATALS(operate_data(operation::SET, value_before_bulk_load, 10));
+        NO_FATALS(operate_data(operation::GET, value_before_bulk_load, 10));
+
+        // Start bulk load and wait until it complete.
+        ASSERT_EQ(ERR_OK, start_bulk_load(ingest_behind));
+        ASSERT_EQ(bulk_load_status::BLS_SUCCEED, wait_bulk_load_finish(300));
+
+        std::cout << "Start to verify data..." << std::endl;
+        if (ingest_behind) {
+            // Values have NOT been overwritten by the bulk load data.
+            NO_FATALS(operate_data(operation::GET, value_before_bulk_load, 
10));
+            NO_FATALS(verify_data(kBulkLoadHashKeyPrefix1, 
kBulkLoadSortKeyPrefix1));
+        } else {
+            // Values have been overwritten by the bulk load data.
+            NO_FATALS(operate_data(operation::GET, kBulkLoadValue, 10));
+            NO_FATALS(verify_bulk_load_data());
+        }
 
-    const string LOCAL_ROOT = "bulk_load_root";
-    const string CLUSTER = "cluster";
-    const string PROVIDER = "local_service";
+        // Write new data succeed after bulk load.
+        NO_FATALS(operate_data(operation::SET, value_after_bulk_load, 20));
+        NO_FATALS(operate_data(operation::GET, value_after_bulk_load, 20));
 
-    const string HASHKEY_PREFIX = "hash";
-    const string SORTKEY_PREFIX = "sort";
-    const string VALUE = "newValue";
-    const int32_t COUNT = 1000;
+        // Delete data succeed after bulk load.
+        NO_FATALS(operate_data(operation::DEL, "", 15));
+        NO_FATALS(operate_data(operation::NO_VALUE, "", 15));
+    }
+
+protected:
+    string bulk_load_local_app_root_;
+    const string kSourceFilesRoot =
+        
"src/test/function_test/bulk_load/pegasus-bulk-load-function-test-files";
+    const string kLocalServiceRoot = "onebox/block_service/local_service";
+    const string kLocalBulkLoadRoot = 
"onebox/block_service/local_service/bulk_load_root";
+    const string kBulkLoad = "bulk_load_root";
+    const string kCluster = "cluster";
+    const string kProvider = "local_service";
+
+    const int32_t kBulkLoadItemCount = 1000;
+    const string kBulkLoadHashKeyPrefix1 = "hashkey";
+    const string kBulkLoadSortKeyPrefix1 = "sortkey";
+    const string kBulkLoadValue = "newValue";
+
+    // Real time write operations will use this prefix as well.
+    const string kBulkLoadHashKeyPrefix2 = "hash";
+    const string kBulkLoadSortKeyPrefix2 = "sort";
 };
 
-///
-/// case1: lack of `bulk_load_info` file
-/// case2: `bulk_load_info` file inconsistent with app_info
-///
-TEST_F(bulk_load_test, bulk_load_test_failed)
+// Test bulk load failed because the 'bulk_load_info' file is missing
+TEST_F(bulk_load_test, missing_bulk_load_info)
 {
-    // bulk load failed because `bulk_load_info` file is missing
-    ASSERT_NO_FATAL_FAILURE(
-        remove_file(bulk_load_local_root_ + "/" + CLUSTER + "/" + app_name_ + 
"/bulk_load_info"));
+    NO_FATALS(remove_file(bulk_load_local_app_root_ + "/bulk_load_info"));
     ASSERT_EQ(ERR_OBJECT_NOT_FOUND, start_bulk_load());
+}
 
-    // bulk load failed because `bulk_load_info` file inconsistent with 
current app_info
-    ASSERT_NO_FATAL_FAILURE(replace_bulk_load_info());
-    ASSERT_EQ(ERR_INCONSISTENT_STATE, start_bulk_load());
+// Test bulk load failed because the 'bulk_load_info' file is inconsistent 
with the actual app info.
+TEST_F(bulk_load_test, inconsistent_bulk_load_info)
+{
+    // Only 'app_id' and 'partition_count' will be checked in Pegasus server, 
so just inject these
+    // kind of inconsistencies.
+    bulk_load_info tests[] = {{app_id_ + 1, app_name_, partition_count_},
+                              {app_id_, app_name_, partition_count_ * 2}};
+    for (const auto &test : tests) {
+        // Generate inconsistent 'bulk_load_info'.
+        auto bulk_load_info_path =
+            fmt::format("{}/{}/{}/bulk_load_info", kLocalBulkLoadRoot, 
kCluster, app_name_);
+        NO_FATALS(generate_bulk_load_info(test, bulk_load_info_path));
+
+        // Generate '.bulk_load_info.meta'.
+        NO_FATALS(generate_bulk_load_info_meta(bulk_load_info_path));
+
+        ASSERT_EQ(ERR_INCONSISTENT_STATE, start_bulk_load()) << test.app_id << 
"," << test.app_name
+                                                             << "," << 
test.partition_count;
+    }
 }
 
-///
-/// case1: lack of `bulk_load_metadata` file
-/// case2: bulk load succeed with data verfied
-/// case3: bulk load data consistent:
-///     - old data will be overrided by bulk load data
-///     - get/set/del succeed after bulk load
-///
-TEST_F(bulk_load_test, bulk_load_tests)
+// Test bulk load failed because partition[0]'s 'bulk_load_metadata' file is 
missing.
+TEST_F(bulk_load_test, missing_p0_bulk_load_metadata)
 {
-    // bulk load failed because partition[0] `bulk_load_metadata` file is 
missing
-    ASSERT_NO_FATAL_FAILURE(remove_file(bulk_load_local_root_ + "/" + CLUSTER 
+ "/" + app_name_ +
-                                        "/0/bulk_load_metadata"));
+    NO_FATALS(remove_file(bulk_load_local_app_root_ + 
"/0/bulk_load_metadata"));
     ASSERT_EQ(ERR_OK, start_bulk_load());
-    // bulk load will get FAILED
     ASSERT_EQ(bulk_load_status::BLS_FAILED, wait_bulk_load_finish(300));
-
-    // recover complete files
-    ASSERT_NO_FATAL_FAILURE(copy_bulk_load_files());
-
-    // write old data
-    ASSERT_NO_FATAL_FAILURE(operate_data(operation::SET, "oldValue", 10));
-    ASSERT_NO_FATAL_FAILURE(operate_data(operation::GET, "oldValue", 10));
-
-    ASSERT_EQ(ERR_OK, start_bulk_load());
-    ASSERT_EQ(bulk_load_status::BLS_SUCCEED, wait_bulk_load_finish(300));
-    std::cout << "Start to verify data..." << std::endl;
-    ASSERT_NO_FATAL_FAILURE(verify_bulk_load_data());
-
-    // value overide by bulk_loaded_data
-    ASSERT_NO_FATAL_FAILURE(operate_data(operation::GET, VALUE, 10));
-
-    // write data after bulk load succeed
-    ASSERT_NO_FATAL_FAILURE(operate_data(operation::SET, "valueAfterBulkLoad", 
20));
-    ASSERT_NO_FATAL_FAILURE(operate_data(operation::GET, "valueAfterBulkLoad", 
20));
-
-    // del data after bulk load succeed
-    ASSERT_NO_FATAL_FAILURE(operate_data(operation::DEL, "", 15));
-    ASSERT_NO_FATAL_FAILURE(operate_data(operation::NO_VALUE, "", 15));
 }
 
-///
-/// case1: inconsistent ingest_behind
-/// case2: bulk load(ingest_behind) succeed with data verfied
-/// case3: bulk load data consistent:
-///     - bulk load data will be overrided by old data
-///     - get/set/del succeed after bulk load
-///
-TEST_F(bulk_load_test, bulk_load_ingest_behind_tests)
+// Test bulk load failed because the allow_ingest_behind config is 
inconsistent.
+TEST_F(bulk_load_test, allow_ingest_behind_inconsistent)
 {
-    ASSERT_NO_FATAL_FAILURE(update_allow_ingest_behind("false"));
-
-    // app envs allow_ingest_behind = false, request ingest_behind = true
+    NO_FATALS(update_allow_ingest_behind("false"));
     ASSERT_EQ(ERR_INCONSISTENT_STATE, start_bulk_load(true));
+}
 
-    ASSERT_NO_FATAL_FAILURE(update_allow_ingest_behind("true"));
-
-    // write old data
-    ASSERT_NO_FATAL_FAILURE(operate_data(operation::SET, "oldValue", 10));
-    ASSERT_NO_FATAL_FAILURE(operate_data(operation::GET, "oldValue", 10));
-
-    ASSERT_EQ(ERR_OK, start_bulk_load(true));
-    ASSERT_EQ(bulk_load_status::BLS_SUCCEED, wait_bulk_load_finish(300));
-
-    std::cout << "Start to verify data..." << std::endl;
-    // value overide by bulk_loaded_data
-    ASSERT_NO_FATAL_FAILURE(operate_data(operation::GET, "oldValue", 10));
-    ASSERT_NO_FATAL_FAILURE(verify_data("hashkey", "sortkey"));
-
-    // write data after bulk load succeed
-    ASSERT_NO_FATAL_FAILURE(operate_data(operation::SET, "valueAfterBulkLoad", 
20));
-    ASSERT_NO_FATAL_FAILURE(operate_data(operation::GET, "valueAfterBulkLoad", 
20));
+// Test normal bulk load, old data will be overwritten by bulk load data.
+TEST_F(bulk_load_test, normal) { check_bulk_load(false, "oldValue", 
"valueAfterBulkLoad"); }
 
-    // del data after bulk load succeed
-    ASSERT_NO_FATAL_FAILURE(operate_data(operation::DEL, "", 15));
-    ASSERT_NO_FATAL_FAILURE(operate_data(operation::NO_VALUE, "", 15));
+// Test normal bulk load with allow_ingest_behind=true, old data will NOT be 
overwritten by bulk
+// load data.
+TEST_F(bulk_load_test, allow_ingest_behind)
+{
+    NO_FATALS(update_allow_ingest_behind("true"));
+    check_bulk_load(true, "oldValue", "valueAfterBulkLoad");
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to