This is an automated email from the ASF dual-hosted git repository.
wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 42136fa4b refactor(test): refactor bulk load function test (#1616)
42136fa4b is described below
commit 42136fa4b36288b7597c32c93043541f18b3bba7
Author: Yingchun Lai <[email protected]>
AuthorDate: Thu Sep 21 10:10:58 2023 +0800
refactor(test): refactor bulk load function test (#1616)
https://github.com/apache/incubator-pegasus/issues/887
There is no functional changes, but only refactor the bulk_load function
test.
---
src/block_service/local/local_service.cpp | 12 +-
src/block_service/local/local_service.h | 10 +
src/block_service/test/local_service_test.cpp | 1 +
src/meta/meta_bulk_load_service.h | 4 +
src/test/function_test/bulk_load/CMakeLists.txt | 3 +-
.../function_test/bulk_load/test_bulk_load.cpp | 341 +++++++++++----------
6 files changed, 205 insertions(+), 166 deletions(-)
diff --git a/src/block_service/local/local_service.cpp
b/src/block_service/local/local_service.cpp
index 5eb4b3fba..04cf1cc3d 100644
--- a/src/block_service/local/local_service.cpp
+++ b/src/block_service/local/local_service.cpp
@@ -16,17 +16,16 @@
// under the License.
#include <errno.h>
-#include <nlohmann/json.hpp>
#include <algorithm>
-#include <fstream>
#include <initializer_list>
+#include <istream>
#include <memory>
#include <set>
#include <type_traits>
#include <utility>
#include "local_service.h"
-#include "nlohmann/detail/macro_scope.hpp"
+#include "nlohmann/json.hpp"
#include "nlohmann/json_fwd.hpp"
#include "runtime/task/async_calls.h"
#include "utils/autoref_ptr.h"
@@ -52,13 +51,6 @@ namespace block_service {
DEFINE_TASK_CODE(LPC_LOCAL_SERVICE_CALL, TASK_PRIORITY_COMMON,
THREAD_POOL_BLOCK_SERVICE)
-struct file_metadata
-{
- uint64_t size;
- std::string md5;
-};
-NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(file_metadata, size, md5)
-
bool file_metadata_from_json(std::ifstream &fin, file_metadata &fmeta) noexcept
{
std::string data;
diff --git a/src/block_service/local/local_service.h
b/src/block_service/local/local_service.h
index 9816734cf..9c944e1d0 100644
--- a/src/block_service/local/local_service.h
+++ b/src/block_service/local/local_service.h
@@ -17,6 +17,9 @@
#pragma once
+#include <nlohmann/detail/macro_scope.hpp>
+#include <nlohmann/json.hpp> // IWYU pragma: keep
+#include <nlohmann/json_fwd.hpp> // IWYU pragma: keep
#include <stdint.h>
#include <string>
#include <vector>
@@ -32,6 +35,13 @@ class task_tracker;
namespace dist {
namespace block_service {
+struct file_metadata
+{
+ int64_t size = 0;
+ std::string md5;
+};
+NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(file_metadata, size, md5)
+
class local_service : public block_filesystem
{
public:
diff --git a/src/block_service/test/local_service_test.cpp
b/src/block_service/test/local_service_test.cpp
index e355a1b28..72a201237 100644
--- a/src/block_service/test/local_service_test.cpp
+++ b/src/block_service/test/local_service_test.cpp
@@ -26,6 +26,7 @@
#include <nlohmann/json_fwd.hpp>
#include <fstream>
#include <initializer_list>
+#include <stdexcept>
#include <vector>
#include "block_service/local/local_service.h"
diff --git a/src/meta/meta_bulk_load_service.h
b/src/meta/meta_bulk_load_service.h
index ba151757a..c411d87f4 100644
--- a/src/meta/meta_bulk_load_service.h
+++ b/src/meta/meta_bulk_load_service.h
@@ -97,6 +97,10 @@ struct bulk_load_info
int32_t app_id;
std::string app_name;
int32_t partition_count;
+ bulk_load_info(int32_t id = 0, const std::string &name = "", int32_t
pcount = 0)
+ : app_id(id), app_name(name), partition_count(pcount)
+ {
+ }
DEFINE_JSON_SERIALIZATION(app_id, app_name, partition_count)
};
diff --git a/src/test/function_test/bulk_load/CMakeLists.txt
b/src/test/function_test/bulk_load/CMakeLists.txt
index 92a677ddf..a6eac0851 100644
--- a/src/test/function_test/bulk_load/CMakeLists.txt
+++ b/src/test/function_test/bulk_load/CMakeLists.txt
@@ -37,7 +37,8 @@ set(MY_PROJ_LIBS
gssapi_krb5
krb5
function_test_utils
- )
+ rocksdb
+ test_utils)
set(MY_BOOST_LIBS Boost::system Boost::filesystem Boost::regex)
diff --git a/src/test/function_test/bulk_load/test_bulk_load.cpp
b/src/test/function_test/bulk_load/test_bulk_load.cpp
index f350c6945..28e52858f 100644
--- a/src/test/function_test/bulk_load/test_bulk_load.cpp
+++ b/src/test/function_test/bulk_load/test_bulk_load.cpp
@@ -15,9 +15,15 @@
// specific language governing permissions and limitations
// under the License.
+#include <fmt/core.h>
// IWYU pragma: no_include <gtest/gtest-message.h>
// IWYU pragma: no_include <gtest/gtest-test-part.h>
#include <gtest/gtest.h>
+#include <nlohmann/json.hpp>
+#include <nlohmann/json_fwd.hpp>
+#include <rocksdb/env.h>
+#include <rocksdb/slice.h>
+#include <rocksdb/status.h>
#include <algorithm>
#include <chrono>
#include <cstdint>
@@ -26,20 +32,24 @@
#include <memory>
#include <string>
#include <thread>
-#include <vector>
#include "base/pegasus_const.h"
+#include "block_service/local/local_service.h"
#include "bulk_load_types.h"
+#include "client/partition_resolver.h"
#include "client/replication_ddl_client.h"
+#include "common/json_helper.h"
#include "include/pegasus/client.h"
#include "include/pegasus/error.h"
+#include "meta/meta_bulk_load_service.h"
#include "meta_admin_types.h"
-#include "metadata_types.h"
#include "test/function_test/utils/test_util.h"
+#include "utils/blob.h"
+#include "utils/enum_helper.h"
#include "utils/error_code.h"
#include "utils/errors.h"
#include "utils/filesystem.h"
-#include "utils/utils.h"
+#include "utils/test_macros.h"
using namespace ::dsn;
using namespace ::dsn::replication;
@@ -55,12 +65,12 @@ using std::string;
/// - `bulk_load_root` sub-directory stores right data
/// - Please do not rename any files or directories under this folder
///
-/// The app who is executing bulk load:
-/// - app_name is `temp`, app_id is 2, partition_count is 8
+/// The app to test bulk load functionality:
+/// - partition count should be 8
///
/// Data:
-/// hashkey: hashi sortkey: sorti value: newValue i=[0, 1000]
-/// hashkey: hashkeyj sortkey: sortkeyj value: newValue j=[0, 1000]
+/// hashkey: hash${i} sortkey: sort${i} value: newValue i=[0, 1000]
+/// hashkey: hashkey${j} sortkey: sortkey${j} value: newValue j=[0, 1000]
///
class bulk_load_test : public test_util
{
@@ -68,80 +78,102 @@ protected:
bulk_load_test() : test_util(map<string,
string>({{"rocksdb.allow_ingest_behind", "true"}}))
{
TRICKY_CODE_TO_AVOID_LINK_ERROR;
- bulk_load_local_root_ =
-
utils::filesystem::path_combine("onebox/block_service/local_service/",
LOCAL_ROOT);
+ bulk_load_local_app_root_ =
+ fmt::format("{}/{}/{}", kLocalBulkLoadRoot, kCluster, app_name_);
}
void SetUp() override
{
test_util::SetUp();
- ASSERT_NO_FATAL_FAILURE(copy_bulk_load_files());
+ NO_FATALS(copy_bulk_load_files());
}
void TearDown() override
{
ASSERT_EQ(ERR_OK, ddl_client_->drop_app(app_name_, 0));
- ASSERT_NO_FATAL_FAILURE(run_cmd_from_project_root("rm -rf
onebox/block_service"));
+ NO_FATALS(run_cmd_from_project_root("rm -rf " + kLocalBulkLoadRoot));
}
- void copy_bulk_load_files()
+ // Generate the 'bulk_load_info' file according to 'bli' to path
'bulk_load_info_path'.
+ void generate_bulk_load_info(const bulk_load_info &bli, const std::string
&bulk_load_info_path)
{
- ASSERT_NO_FATAL_FAILURE(run_cmd_from_project_root("mkdir -p
onebox/block_service"));
- ASSERT_NO_FATAL_FAILURE(
- run_cmd_from_project_root("mkdir -p
onebox/block_service/local_service"));
- ASSERT_NO_FATAL_FAILURE(run_cmd_from_project_root(
- "cp -r
src/test/function_test/bulk_load/pegasus-bulk-load-function-test-files/" +
- LOCAL_ROOT + " onebox/block_service/local_service"));
- string cmd = "echo '{\"app_id\":" + std::to_string(app_id_) +
- ",\"app_name\":\"temp\",\"partition_count\":8}' > "
-
"onebox/block_service/local_service/bulk_load_root/cluster/temp/"
- "bulk_load_info";
- ASSERT_NO_FATAL_FAILURE(run_cmd_from_project_root(cmd));
+ auto value = dsn::json::json_forwarder<bulk_load_info>::encode(bli);
+ auto s = rocksdb::WriteStringToFile(rocksdb::Env::Default(),
+ rocksdb::Slice(value.data(),
value.length()),
+ bulk_load_info_path,
+ /* should_sync */ true);
+ ASSERT_TRUE(s.ok()) << s.ToString();
}
- error_code start_bulk_load(bool ingest_behind = false)
+ // Generate the '.bulk_load_info.meta' file according to the
'bulk_load_info' file
+ // in path 'bulk_load_info_path'.
+ void generate_bulk_load_info_meta(const std::string &bulk_load_info_path)
{
- auto err_resp =
- ddl_client_->start_bulk_load(app_name_, CLUSTER, PROVIDER,
LOCAL_ROOT, ingest_behind);
- return err_resp.get_value().err;
+ dist::block_service::file_metadata fm;
+ ASSERT_TRUE(utils::filesystem::file_size(bulk_load_info_path,
fm.size));
+ ASSERT_EQ(ERR_OK, utils::filesystem::md5sum(bulk_load_info_path,
fm.md5));
+ std::string value = nlohmann::json(fm).dump();
+ auto bulk_load_info_meta_path =
+ fmt::format("{}/{}/{}/.bulk_load_info.meta", kLocalBulkLoadRoot,
kCluster, app_name_);
+ auto s = rocksdb::WriteStringToFile(rocksdb::Env::Default(),
+ rocksdb::Slice(value),
+ bulk_load_info_meta_path,
+ /* should_sync */ true);
+ ASSERT_TRUE(s.ok()) << s.ToString();
}
- void remove_file(const string &file_path)
+ void copy_bulk_load_files()
+ {
+ // TODO(yingchun): remove the 'mock_bulk_load_info' file, because we
can generate it.
+ // Prepare bulk load files.
+ // The source data has 8 partitions.
+ ASSERT_EQ(8, partition_count_);
+ NO_FATALS(run_cmd_from_project_root("mkdir -p " + kLocalBulkLoadRoot));
+ NO_FATALS(run_cmd_from_project_root(
+ fmt::format("cp -r {}/{} {}", kSourceFilesRoot, kBulkLoad,
kLocalServiceRoot)));
+
+ // Generate 'bulk_load_info'.
+ auto bulk_load_info_path =
+ fmt::format("{}/{}/{}/bulk_load_info", kLocalBulkLoadRoot,
kCluster, app_name_);
+ NO_FATALS(generate_bulk_load_info(bulk_load_info(app_id_, app_name_,
partition_count_),
+ bulk_load_info_path));
+
+ // Generate '.bulk_load_info.meta'.
+ NO_FATALS(generate_bulk_load_info_meta(bulk_load_info_path));
+ }
+
+ error_code start_bulk_load(bool ingest_behind = false)
{
- ASSERT_NO_FATAL_FAILURE(run_cmd_from_project_root("rm " + file_path));
+ return ddl_client_
+ ->start_bulk_load(app_name_, kCluster, kProvider, kBulkLoad,
ingest_behind)
+ .get_value()
+ .err;
}
- void replace_bulk_load_info()
+ void remove_file(const string &file_path)
{
- string cmd = "cp -R "
-
"src/test/function_test/bulk_load/pegasus-bulk-load-function-test-files/"
- "mock_bulk_load_info/. " +
- bulk_load_local_root_ + "/" + CLUSTER + "/" + app_name_ +
"/";
- ASSERT_NO_FATAL_FAILURE(run_cmd_from_project_root(cmd));
+ NO_FATALS(run_cmd_from_project_root("rm " + file_path));
}
void update_allow_ingest_behind(const string &allow_ingest_behind)
{
- // update app envs
- std::vector<string> keys;
- keys.emplace_back(ROCKSDB_ALLOW_INGEST_BEHIND);
- std::vector<string> values;
- values.emplace_back(allow_ingest_behind);
- ASSERT_EQ(ERR_OK, ddl_client_->set_app_envs(app_name_, keys,
values).get_value().err);
+ const auto ret = ddl_client_->set_app_envs(
+ app_name_, {ROCKSDB_ALLOW_INGEST_BEHIND}, {allow_ingest_behind});
+ ASSERT_EQ(ERR_OK, ret.get_value().err);
std::cout << "sleep 31s to wait app_envs update" << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(31));
}
- bulk_load_status::type wait_bulk_load_finish(int64_t seconds)
+ bulk_load_status::type wait_bulk_load_finish(int64_t remain_seconds)
{
int64_t sleep_time = 5;
- error_code err = ERR_OK;
+ auto err = ERR_OK;
- bulk_load_status::type last_status = bulk_load_status::BLS_INVALID;
+ auto last_status = bulk_load_status::BLS_INVALID;
// when bulk load end, err will be ERR_INVALID_STATE
- while (seconds > 0 && err == ERR_OK) {
- sleep_time = sleep_time > seconds ? seconds : sleep_time;
- seconds -= sleep_time;
+ while (remain_seconds > 0 && err == ERR_OK) {
+ sleep_time = std::min(sleep_time, remain_seconds);
+ remain_seconds -= sleep_time;
std::cout << "sleep " << sleep_time << "s to query bulk status" <<
std::endl;
std::this_thread::sleep_for(std::chrono::seconds(sleep_time));
@@ -156,52 +188,51 @@ protected:
void verify_bulk_load_data()
{
- ASSERT_NO_FATAL_FAILURE(verify_data("hashkey", "sortkey"));
- ASSERT_NO_FATAL_FAILURE(verify_data(HASHKEY_PREFIX, SORTKEY_PREFIX));
+ NO_FATALS(verify_data(kBulkLoadHashKeyPrefix1,
kBulkLoadSortKeyPrefix1));
+ NO_FATALS(verify_data(kBulkLoadHashKeyPrefix2,
kBulkLoadSortKeyPrefix2));
}
void verify_data(const string &hashkey_prefix, const string
&sortkey_prefix)
{
- const string &expected_value = VALUE;
- for (int i = 0; i < COUNT; ++i) {
+ for (int i = 0; i < kBulkLoadItemCount; ++i) {
string hash_key = hashkey_prefix + std::to_string(i);
- for (int j = 0; j < COUNT; ++j) {
+ for (int j = 0; j < kBulkLoadItemCount; ++j) {
string sort_key = sortkey_prefix + std::to_string(j);
- string act_value;
- ASSERT_EQ(PERR_OK, client_->get(hash_key, sort_key,
act_value)) << hash_key << ","
-
<< sort_key;
- ASSERT_EQ(expected_value, act_value) << hash_key << "," <<
sort_key;
+ string actual_value;
+ ASSERT_EQ(PERR_OK, client_->get(hash_key, sort_key,
actual_value))
+ << hash_key << "," << sort_key;
+ ASSERT_EQ(kBulkLoadValue, actual_value) << hash_key << "," <<
sort_key;
}
}
}
- enum operation
+ enum class operation
{
GET,
SET,
DEL,
NO_VALUE
};
- void operate_data(bulk_load_test::operation op, const string &value, int
count)
+ void operate_data(operation op, const string &value, int count)
{
for (int i = 0; i < count; ++i) {
- string hash_key = HASHKEY_PREFIX + std::to_string(i);
- string sort_key = SORTKEY_PREFIX + std::to_string(i);
+ auto hash_key = fmt::format("{}{}", kBulkLoadHashKeyPrefix2, i);
+ auto sort_key = fmt::format("{}{}", kBulkLoadSortKeyPrefix2, i);
switch (op) {
- case bulk_load_test::operation::GET: {
- string act_value;
- ASSERT_EQ(PERR_OK, client_->get(hash_key, sort_key,
act_value));
- ASSERT_EQ(value, act_value);
+ case operation::GET: {
+ string actual_value;
+ ASSERT_EQ(PERR_OK, client_->get(hash_key, sort_key,
actual_value));
+ ASSERT_EQ(value, actual_value);
} break;
- case bulk_load_test::operation::DEL: {
+ case operation::DEL: {
ASSERT_EQ(PERR_OK, client_->del(hash_key, sort_key));
} break;
- case bulk_load_test::operation::SET: {
+ case operation::SET: {
ASSERT_EQ(PERR_OK, client_->set(hash_key, sort_key, value));
} break;
- case bulk_load_test::operation::NO_VALUE: {
- string act_value;
- ASSERT_EQ(PERR_NOT_FOUND, client_->get(hash_key, sort_key,
act_value));
+ case operation::NO_VALUE: {
+ string actual_value;
+ ASSERT_EQ(PERR_NOT_FOUND, client_->get(hash_key, sort_key,
actual_value));
} break;
default:
ASSERT_TRUE(false);
@@ -210,108 +241,108 @@ protected:
}
}
-protected:
- string bulk_load_local_root_;
+ void check_bulk_load(bool ingest_behind,
+ const std::string &value_before_bulk_load,
+ const std::string &value_after_bulk_load)
+ {
+ // Write some data before bulk load.
+ NO_FATALS(operate_data(operation::SET, value_before_bulk_load, 10));
+ NO_FATALS(operate_data(operation::GET, value_before_bulk_load, 10));
+
+ // Start bulk load and wait until it complete.
+ ASSERT_EQ(ERR_OK, start_bulk_load(ingest_behind));
+ ASSERT_EQ(bulk_load_status::BLS_SUCCEED, wait_bulk_load_finish(300));
+
+ std::cout << "Start to verify data..." << std::endl;
+ if (ingest_behind) {
+ // Values have NOT been overwritten by the bulk load data.
+ NO_FATALS(operate_data(operation::GET, value_before_bulk_load,
10));
+ NO_FATALS(verify_data(kBulkLoadHashKeyPrefix1,
kBulkLoadSortKeyPrefix1));
+ } else {
+ // Values have been overwritten by the bulk load data.
+ NO_FATALS(operate_data(operation::GET, kBulkLoadValue, 10));
+ NO_FATALS(verify_bulk_load_data());
+ }
- const string LOCAL_ROOT = "bulk_load_root";
- const string CLUSTER = "cluster";
- const string PROVIDER = "local_service";
+ // Write new data succeed after bulk load.
+ NO_FATALS(operate_data(operation::SET, value_after_bulk_load, 20));
+ NO_FATALS(operate_data(operation::GET, value_after_bulk_load, 20));
- const string HASHKEY_PREFIX = "hash";
- const string SORTKEY_PREFIX = "sort";
- const string VALUE = "newValue";
- const int32_t COUNT = 1000;
+ // Delete data succeed after bulk load.
+ NO_FATALS(operate_data(operation::DEL, "", 15));
+ NO_FATALS(operate_data(operation::NO_VALUE, "", 15));
+ }
+
+protected:
+ string bulk_load_local_app_root_;
+ const string kSourceFilesRoot =
+
"src/test/function_test/bulk_load/pegasus-bulk-load-function-test-files";
+ const string kLocalServiceRoot = "onebox/block_service/local_service";
+ const string kLocalBulkLoadRoot =
"onebox/block_service/local_service/bulk_load_root";
+ const string kBulkLoad = "bulk_load_root";
+ const string kCluster = "cluster";
+ const string kProvider = "local_service";
+
+ const int32_t kBulkLoadItemCount = 1000;
+ const string kBulkLoadHashKeyPrefix1 = "hashkey";
+ const string kBulkLoadSortKeyPrefix1 = "sortkey";
+ const string kBulkLoadValue = "newValue";
+
+ // Real time write operations will use this prefix as well.
+ const string kBulkLoadHashKeyPrefix2 = "hash";
+ const string kBulkLoadSortKeyPrefix2 = "sort";
};
-///
-/// case1: lack of `bulk_load_info` file
-/// case2: `bulk_load_info` file inconsistent with app_info
-///
-TEST_F(bulk_load_test, bulk_load_test_failed)
+// Test bulk load failed because the 'bulk_load_info' file is missing
+TEST_F(bulk_load_test, missing_bulk_load_info)
{
- // bulk load failed because `bulk_load_info` file is missing
- ASSERT_NO_FATAL_FAILURE(
- remove_file(bulk_load_local_root_ + "/" + CLUSTER + "/" + app_name_ +
"/bulk_load_info"));
+ NO_FATALS(remove_file(bulk_load_local_app_root_ + "/bulk_load_info"));
ASSERT_EQ(ERR_OBJECT_NOT_FOUND, start_bulk_load());
+}
- // bulk load failed because `bulk_load_info` file inconsistent with
current app_info
- ASSERT_NO_FATAL_FAILURE(replace_bulk_load_info());
- ASSERT_EQ(ERR_INCONSISTENT_STATE, start_bulk_load());
+// Test bulk load failed because the 'bulk_load_info' file is inconsistent
with the actual app info.
+TEST_F(bulk_load_test, inconsistent_bulk_load_info)
+{
+ // Only 'app_id' and 'partition_count' will be checked in Pegasus server,
so just inject these
+ // kind of inconsistencies.
+ bulk_load_info tests[] = {{app_id_ + 1, app_name_, partition_count_},
+ {app_id_, app_name_, partition_count_ * 2}};
+ for (const auto &test : tests) {
+ // Generate inconsistent 'bulk_load_info'.
+ auto bulk_load_info_path =
+ fmt::format("{}/{}/{}/bulk_load_info", kLocalBulkLoadRoot,
kCluster, app_name_);
+ NO_FATALS(generate_bulk_load_info(test, bulk_load_info_path));
+
+ // Generate '.bulk_load_info.meta'.
+ NO_FATALS(generate_bulk_load_info_meta(bulk_load_info_path));
+
+ ASSERT_EQ(ERR_INCONSISTENT_STATE, start_bulk_load()) << test.app_id <<
"," << test.app_name
+ << "," <<
test.partition_count;
+ }
}
-///
-/// case1: lack of `bulk_load_metadata` file
-/// case2: bulk load succeed with data verfied
-/// case3: bulk load data consistent:
-/// - old data will be overrided by bulk load data
-/// - get/set/del succeed after bulk load
-///
-TEST_F(bulk_load_test, bulk_load_tests)
+// Test bulk load failed because partition[0]'s 'bulk_load_metadata' file is
missing.
+TEST_F(bulk_load_test, missing_p0_bulk_load_metadata)
{
- // bulk load failed because partition[0] `bulk_load_metadata` file is
missing
- ASSERT_NO_FATAL_FAILURE(remove_file(bulk_load_local_root_ + "/" + CLUSTER
+ "/" + app_name_ +
- "/0/bulk_load_metadata"));
+ NO_FATALS(remove_file(bulk_load_local_app_root_ +
"/0/bulk_load_metadata"));
ASSERT_EQ(ERR_OK, start_bulk_load());
- // bulk load will get FAILED
ASSERT_EQ(bulk_load_status::BLS_FAILED, wait_bulk_load_finish(300));
-
- // recover complete files
- ASSERT_NO_FATAL_FAILURE(copy_bulk_load_files());
-
- // write old data
- ASSERT_NO_FATAL_FAILURE(operate_data(operation::SET, "oldValue", 10));
- ASSERT_NO_FATAL_FAILURE(operate_data(operation::GET, "oldValue", 10));
-
- ASSERT_EQ(ERR_OK, start_bulk_load());
- ASSERT_EQ(bulk_load_status::BLS_SUCCEED, wait_bulk_load_finish(300));
- std::cout << "Start to verify data..." << std::endl;
- ASSERT_NO_FATAL_FAILURE(verify_bulk_load_data());
-
- // value overide by bulk_loaded_data
- ASSERT_NO_FATAL_FAILURE(operate_data(operation::GET, VALUE, 10));
-
- // write data after bulk load succeed
- ASSERT_NO_FATAL_FAILURE(operate_data(operation::SET, "valueAfterBulkLoad",
20));
- ASSERT_NO_FATAL_FAILURE(operate_data(operation::GET, "valueAfterBulkLoad",
20));
-
- // del data after bulk load succeed
- ASSERT_NO_FATAL_FAILURE(operate_data(operation::DEL, "", 15));
- ASSERT_NO_FATAL_FAILURE(operate_data(operation::NO_VALUE, "", 15));
}
-///
-/// case1: inconsistent ingest_behind
-/// case2: bulk load(ingest_behind) succeed with data verfied
-/// case3: bulk load data consistent:
-/// - bulk load data will be overrided by old data
-/// - get/set/del succeed after bulk load
-///
-TEST_F(bulk_load_test, bulk_load_ingest_behind_tests)
+// Test bulk load failed because the allow_ingest_behind config is
inconsistent.
+TEST_F(bulk_load_test, allow_ingest_behind_inconsistent)
{
- ASSERT_NO_FATAL_FAILURE(update_allow_ingest_behind("false"));
-
- // app envs allow_ingest_behind = false, request ingest_behind = true
+ NO_FATALS(update_allow_ingest_behind("false"));
ASSERT_EQ(ERR_INCONSISTENT_STATE, start_bulk_load(true));
+}
- ASSERT_NO_FATAL_FAILURE(update_allow_ingest_behind("true"));
-
- // write old data
- ASSERT_NO_FATAL_FAILURE(operate_data(operation::SET, "oldValue", 10));
- ASSERT_NO_FATAL_FAILURE(operate_data(operation::GET, "oldValue", 10));
-
- ASSERT_EQ(ERR_OK, start_bulk_load(true));
- ASSERT_EQ(bulk_load_status::BLS_SUCCEED, wait_bulk_load_finish(300));
-
- std::cout << "Start to verify data..." << std::endl;
- // value overide by bulk_loaded_data
- ASSERT_NO_FATAL_FAILURE(operate_data(operation::GET, "oldValue", 10));
- ASSERT_NO_FATAL_FAILURE(verify_data("hashkey", "sortkey"));
-
- // write data after bulk load succeed
- ASSERT_NO_FATAL_FAILURE(operate_data(operation::SET, "valueAfterBulkLoad",
20));
- ASSERT_NO_FATAL_FAILURE(operate_data(operation::GET, "valueAfterBulkLoad",
20));
+// Test normal bulk load, old data will be overwritten by bulk load data.
+TEST_F(bulk_load_test, normal) { check_bulk_load(false, "oldValue",
"valueAfterBulkLoad"); }
- // del data after bulk load succeed
- ASSERT_NO_FATAL_FAILURE(operate_data(operation::DEL, "", 15));
- ASSERT_NO_FATAL_FAILURE(operate_data(operation::NO_VALUE, "", 15));
+// Test normal bulk load with allow_ingest_behind=true, old data will NOT be
overwritten by bulk
+// load data.
+TEST_F(bulk_load_test, allow_ingest_behind)
+{
+ NO_FATALS(update_allow_ingest_behind("true"));
+ check_bulk_load(true, "oldValue", "valueAfterBulkLoad");
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]