This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 4939d8c6d1e branch-3.0: [Enhancement](recycler) Add some UT for
recycler #47739 (#47803)
4939d8c6d1e is described below
commit 4939d8c6d1e8f8d59b2cbeafb586bde99b6a8826
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Feb 12 12:11:09 2025 +0800
branch-3.0: [Enhancement](recycler) Add some UT for recycler #47739 (#47803)
Cherry-picked from #47739
Co-authored-by: abmdocrt <[email protected]>
---
cloud/src/recycler/recycler.cpp | 6 +-
cloud/src/recycler/s3_accessor.cpp | 2 +
cloud/test/hdfs_accessor_test.cpp | 4 +
cloud/test/recycler_test.cpp | 310 +++++++++++++++++++++++++++++++++++++
4 files changed, 320 insertions(+), 2 deletions(-)
diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp
index 633a0e081c6..52597fad04b 100644
--- a/cloud/src/recycler/recycler.cpp
+++ b/cloud/src/recycler/recycler.cpp
@@ -282,8 +282,8 @@ void Recycler::recycle_callback() {
recycling_instance_map_.erase(instance_id);
}
auto elpased_ms =
- ctime_ms -
-
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
+
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count() -
+ ctime_ms;
LOG_INFO("finish recycle instance")
.tag("instance_id", instance_id)
.tag("cost_ms", elpased_ms);
@@ -1595,6 +1595,8 @@ int InstanceRecycler::delete_rowset_data(const
std::vector<doris::RowsetMetaClou
DCHECK(accessor_map_.count(*rid))
<< "uninitilized accessor, instance_id=" << instance_id_
<< " resource_id=" << resource_id << " path[0]=" <<
(*paths)[0];
+
TEST_SYNC_POINT_CALLBACK("InstanceRecycler::delete_rowset_data.no_resource_id",
+ &accessor_map_);
if (!accessor_map_.contains(*rid)) {
LOG_WARNING("delete rowset data accessor_map_ does not
contains resouce id")
.tag("resource_id", resource_id)
diff --git a/cloud/src/recycler/s3_accessor.cpp
b/cloud/src/recycler/s3_accessor.cpp
index 224b36c277c..0ef150e20d1 100644
--- a/cloud/src/recycler/s3_accessor.cpp
+++ b/cloud/src/recycler/s3_accessor.cpp
@@ -22,6 +22,7 @@
#include <aws/core/client/DefaultRetryStrategy.h>
#include <aws/s3/S3Client.h>
#include <bvar/reducer.h>
+#include <cpp/sync_point.h>
#include <gen_cpp/cloud.pb.h>
#include <algorithm>
@@ -224,6 +225,7 @@ std::string S3Accessor::to_uri(const std::string&
relative_path) const {
}
int S3Accessor::create(S3Conf conf, std::shared_ptr<S3Accessor>* accessor) {
+ TEST_SYNC_POINT_RETURN_WITH_VALUE("S3Accessor::init.s3_init_failed",
(int)-1);
switch (conf.provider) {
case S3Conf::GCS:
*accessor = std::make_shared<GcsAccessor>(std::move(conf));
diff --git a/cloud/test/hdfs_accessor_test.cpp
b/cloud/test/hdfs_accessor_test.cpp
index 11c0af3853b..40b9fdda3b4 100644
--- a/cloud/test/hdfs_accessor_test.cpp
+++ b/cloud/test/hdfs_accessor_test.cpp
@@ -258,6 +258,10 @@ TEST(HdfsAccessorTest, delete_prefix) {
put_and_verify("data/20000/1_0.dat");
put_and_verify("data111/10000/1_0.dat");
+ ret = accessor.delete_prefix("nonexist");
+ EXPECT_EQ(ret, -1);
+ ret = accessor.delete_prefix("/");
+ EXPECT_EQ(ret, -1);
ret = accessor.delete_prefix("data/10000/1_");
EXPECT_EQ(ret, 0);
ret = accessor.delete_prefix("data/10000/2_");
diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp
index 57ebb8c1347..f1834ad8003 100644
--- a/cloud/test/recycler_test.cpp
+++ b/cloud/test/recycler_test.cpp
@@ -3406,6 +3406,216 @@ TEST(RecyclerTest, init_vault_accessor_failed_test) {
EXPECT_EQ(recycler.accessor_map_.at("success_vault")->exists("data/0/test.csv"),
1);
}
+TEST(RecyclerTest, recycle_tablet_without_resource_id) {
+ auto* sp = SyncPoint::get_instance();
+ std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01,
[&sp](int*) {
+ sp->clear_all_call_backs();
+ sp->clear_trace();
+ sp->disable_processing();
+ });
+
+ auto txn_kv = std::make_shared<MemTxnKv>();
+ EXPECT_EQ(txn_kv->init(), 0);
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ std::string key;
+ std::string val;
+
+ InstanceKeyInfo key_info {"test_instance"};
+ instance_key(key_info, &key);
+ InstanceInfoPB instance;
+ instance.set_instance_id("GetObjStoreInfoTestInstance");
+
+ auto accessor = std::make_shared<MockAccessor>();
+ EXPECT_EQ(accessor->put_file("data/0/test.csv", ""), 0);
+ sp->set_call_back(
+ "InstanceRecycler::init_storage_vault_accessors.mock_vault",
[&accessor](auto&& args) {
+ auto* map = try_any_cast<
+ std::unordered_map<std::string,
std::shared_ptr<StorageVaultAccessor>>*>(
+ args[0]);
+ auto* vault = try_any_cast<StorageVaultPB*>(args[1]);
+ if (vault->name() == "test_success_hdfs_vault") {
+ map->emplace(vault->id(), accessor);
+ }
+ });
+ sp->set_call_back("InstanceRecycler::recycle_tablet.create_rowset_meta",
[](auto&& args) {
+ auto* resp = try_any_cast<GetRowsetResponse*>(args[0]);
+ auto* rs = resp->add_rowset_meta();
+ EXPECT_EQ(rs->has_resource_id(), false);
+ });
+ sp->set_call_back("HdfsAccessor::init.hdfs_init_failed", [](auto&& args) {
+ auto* ret = try_any_cast_ret<int>(args);
+ ret->first = -1;
+ ret->second = true;
+ });
+ sp->enable_processing();
+
+ // succeed to init MockAccessor
+ {
+ HdfsBuildConf hdfs_build_conf;
+ StorageVaultPB vault;
+ hdfs_build_conf.set_fs_name("fs_name");
+ hdfs_build_conf.set_user("root");
+ HdfsVaultInfo hdfs_info;
+ hdfs_info.set_prefix("root_path");
+ hdfs_info.mutable_build_conf()->MergeFrom(hdfs_build_conf);
+ vault.mutable_hdfs_info()->MergeFrom(hdfs_info);
+ vault.set_name("test_success_hdfs_vault");
+ vault.set_id("success_vault");
+ instance.add_storage_vault_names(vault.name());
+ instance.add_resource_ids(vault.id());
+ instance.set_instance_id("GetObjStoreInfoTestInstance");
+ txn->put(storage_vault_key({instance.instance_id(), "4"}),
vault.SerializeAsString());
+ }
+
+ val = instance.SerializeAsString();
+ txn->put(key, val);
+ EXPECT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+ EXPECT_EQ(accessor->exists("data/0/test.csv"), 0);
+
+ InstanceRecycler recycler(txn_kv, instance, thread_group,
+ std::make_shared<TxnLazyCommitter>(txn_kv));
+ EXPECT_EQ(recycler.init(), 0);
+ EXPECT_EQ(recycler.accessor_map_.size(), 1);
+
+ // useful mock accessor
+
EXPECT_EQ(recycler.accessor_map_.at("success_vault")->exists("data/0/test.csv"),
0);
+
+ // recycle tablet will fail because unuseful obj accessor can not
connectted
+ EXPECT_EQ(recycler.recycle_tablet(0), -1);
+ // no resource id, cannot recycle
+
EXPECT_EQ(recycler.accessor_map_.at("success_vault")->exists("data/0/test.csv"),
0);
+}
+
+TEST(RecyclerTest, recycle_tablet_with_wrong_resource_id) {
+ auto* sp = SyncPoint::get_instance();
+ std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01,
[&sp](int*) {
+ sp->clear_all_call_backs();
+ sp->clear_trace();
+ sp->disable_processing();
+ });
+
+ auto txn_kv = std::make_shared<MemTxnKv>();
+ EXPECT_EQ(txn_kv->init(), 0);
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ std::string key;
+ std::string val;
+
+ InstanceKeyInfo key_info {"test_instance"};
+ instance_key(key_info, &key);
+ InstanceInfoPB instance;
+ instance.set_instance_id("GetObjStoreInfoTestInstance");
+
+ auto accessor = std::make_shared<MockAccessor>();
+ EXPECT_EQ(accessor->put_file("data/0/test.csv", ""), 0);
+ sp->set_call_back(
+ "InstanceRecycler::init_storage_vault_accessors.mock_vault",
[&accessor](auto&& args) {
+ auto* map = try_any_cast<
+ std::unordered_map<std::string,
std::shared_ptr<StorageVaultAccessor>>*>(
+ args[0]);
+ auto* vault = try_any_cast<StorageVaultPB*>(args[1]);
+ if (vault->name() == "test_success_hdfs_vault") {
+ map->emplace(vault->id(), accessor);
+ }
+ });
+ sp->set_call_back("InstanceRecycler::recycle_tablet.create_rowset_meta",
[](auto&& args) {
+ auto* resp = try_any_cast<GetRowsetResponse*>(args[0]);
+ auto* rs = resp->add_rowset_meta();
+ rs->set_resource_id("no_id");
+ });
+ sp->set_call_back("HdfsAccessor::init.hdfs_init_failed", [](auto&& args) {
+ auto* ret = try_any_cast_ret<int>(args);
+ ret->first = -1;
+ ret->second = true;
+ });
+ sp->enable_processing();
+
+ // succeed to init MockAccessor
+ {
+ HdfsBuildConf hdfs_build_conf;
+ StorageVaultPB vault;
+ hdfs_build_conf.set_fs_name("fs_name");
+ hdfs_build_conf.set_user("root");
+ HdfsVaultInfo hdfs_info;
+ hdfs_info.set_prefix("root_path");
+ hdfs_info.mutable_build_conf()->MergeFrom(hdfs_build_conf);
+ vault.mutable_hdfs_info()->MergeFrom(hdfs_info);
+ vault.set_name("test_success_hdfs_vault");
+ vault.set_id("success_vault");
+ instance.add_storage_vault_names(vault.name());
+ instance.add_resource_ids(vault.id());
+ instance.set_instance_id("GetObjStoreInfoTestInstance");
+ txn->put(storage_vault_key({instance.instance_id(), "4"}),
vault.SerializeAsString());
+ }
+
+ val = instance.SerializeAsString();
+ txn->put(key, val);
+ EXPECT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+ EXPECT_EQ(accessor->exists("data/0/test.csv"), 0);
+
+ InstanceRecycler recycler(txn_kv, instance, thread_group,
+ std::make_shared<TxnLazyCommitter>(txn_kv));
+ EXPECT_EQ(recycler.init(), 0);
+ EXPECT_EQ(recycler.accessor_map_.size(), 1);
+
+ // useful mock accessor
+
EXPECT_EQ(recycler.accessor_map_.at("success_vault")->exists("data/0/test.csv"),
0);
+
+ // recycle tablet will fail because unuseful obj accessor can not
connectted
+ EXPECT_EQ(recycler.recycle_tablet(0), -1);
+ // no resource id, cannot recycle
+
EXPECT_EQ(recycler.accessor_map_.at("success_vault")->exists("data/0/test.csv"),
0);
+}
+
+TEST(RecyclerTest, init_all_vault_accessors_failed_test) {
+ auto* sp = SyncPoint::get_instance();
+ std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01,
[&sp](int*) {
+ sp->clear_all_call_backs();
+ sp->clear_trace();
+ sp->disable_processing();
+ });
+
+ auto txn_kv = std::make_shared<MemTxnKv>();
+ EXPECT_EQ(txn_kv->init(), 0);
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ std::string key;
+ std::string val;
+
+ InstanceKeyInfo key_info {"test_instance"};
+ instance_key(key_info, &key);
+ InstanceInfoPB instance;
+ instance.set_instance_id("GetObjStoreInfoTestInstance");
+ // failed to init because S3Conf::from_obj_store_info() fails
+ {
+ ObjectStoreInfoPB obj_info;
+ StorageVaultPB vault;
+ obj_info.set_id("id");
+ obj_info.set_ak("ak");
+ obj_info.set_sk("sk");
+ vault.mutable_obj_info()->MergeFrom(obj_info);
+ vault.set_name("test_failed_s3_vault");
+ vault.set_id("failed_s3");
+ instance.add_storage_vault_names(vault.name());
+ instance.add_resource_ids(vault.id());
+ txn->put(storage_vault_key({instance.instance_id(), "1"}),
vault.SerializeAsString());
+ }
+
+ sp->set_call_back("S3Accessor::init.s3_init_failed", [](auto&& args) {
+ auto* ret = try_any_cast_ret<int>(args);
+ ret->first = -1;
+ ret->second = true;
+ });
+ sp->enable_processing();
+
+ InstanceRecycler recycler(txn_kv, instance, thread_group,
+ std::make_shared<TxnLazyCommitter>(txn_kv));
+ EXPECT_EQ(recycler.init(), -2);
+}
+
TEST(RecyclerTest, delete_tmp_rowset_data_with_idx_v1) {
auto* sp = SyncPoint::get_instance();
std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01,
[&sp](int*) {
@@ -3563,4 +3773,104 @@ TEST(RecyclerTest, delete_tmp_rowset_data_with_idx_v2) {
}
}
+TEST(RecyclerTest, delete_tmp_rowset_without_resource_id) {
+ auto* sp = SyncPoint::get_instance();
+ std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01,
[&sp](int*) {
+ sp->clear_all_call_backs();
+ sp->clear_trace();
+ sp->disable_processing();
+ });
+ auto txn_kv = std::make_shared<MemTxnKv>();
+ ASSERT_EQ(txn_kv->init(), 0);
+
+ InstanceInfoPB instance;
+ instance.set_instance_id(instance_id);
+ auto obj_info = instance.add_obj_info();
+ obj_info->set_id("delete_tmp_rowset_data_with_idx_v2");
+ obj_info->set_ak(config::test_s3_ak);
+ obj_info->set_sk(config::test_s3_sk);
+ obj_info->set_endpoint(config::test_s3_endpoint);
+ obj_info->set_region(config::test_s3_region);
+ obj_info->set_bucket(config::test_s3_bucket);
+ obj_info->set_prefix("delete_tmp_rowset_data_with_idx_v2");
+
+ doris::TabletSchemaCloudPB schema;
+ schema.set_schema_version(1);
+ auto index = schema.add_index();
+ index->set_index_id(1);
+ index->set_index_type(IndexType::INVERTED);
+
+ sp->set_call_back("InstanceRecycler::delete_rowset_data.tmp_rowset",
[](auto&& args) {
+ auto* ret = try_any_cast<int*>(args[0]);
+ *ret = 1;
+ });
+ sp->set_call_back("InstanceRecycler::delete_rowset_data.no_resource_id",
[](auto&& args) {
+ auto* map = try_any_cast<
+ std::unordered_map<std::string,
std::shared_ptr<StorageVaultAccessor>>*>(args[0]);
+ map->erase("no_resource_id");
+ ;
+ });
+ sp->enable_processing();
+
+ {
+ InstanceRecycler recycler(txn_kv, instance, thread_group,
+ std::make_shared<TxnLazyCommitter>(txn_kv));
+ ASSERT_EQ(recycler.init(), 0);
+ auto accessor = recycler.accessor_map_.begin()->second;
+ std::vector<doris::RowsetMetaCloudPB> rowset_pbs;
+ doris::RowsetMetaCloudPB rowset;
+ rowset.set_rowset_id(0); // useless but required
+ rowset.set_rowset_id_v2("1");
+ rowset.set_num_segments(1);
+ rowset.set_tablet_id(10000);
+ rowset.set_index_id(10001);
+ rowset.set_resource_id("delete_tmp_rowset_data_with_idx_v2");
+ rowset.set_schema_version(schema.schema_version());
+ rowset.mutable_tablet_schema()->CopyFrom(schema);
+ create_tmp_rowset(txn_kv.get(), accessor.get(), rowset, 1, true);
+ rowset.clear_tablet_schema();
+ rowset_pbs.emplace_back(rowset);
+
+ rowset.set_rowset_id(0); // useless but required
+ rowset.set_rowset_id_v2("2");
+ rowset.set_num_segments(1);
+ rowset.set_tablet_id(20000);
+ rowset.set_index_id(20001);
+ rowset.set_resource_id("no_resource_id");
+ rowset.set_schema_version(schema.schema_version());
+ rowset.mutable_tablet_schema()->CopyFrom(schema);
+ create_tmp_rowset(txn_kv.get(), accessor.get(), rowset, 1, true);
+ rowset.clear_tablet_schema();
+ rowset_pbs.emplace_back(rowset);
+
+ std::unordered_set<std::string> list_files;
+ std::unique_ptr<ListIterator> iter;
+ EXPECT_EQ(accessor->list_all(&iter), 0);
+ EXPECT_TRUE(iter->has_next());
+ list_files.clear();
+ for (auto file = iter->next(); file.has_value(); file = iter->next()) {
+ list_files.insert(file->path);
+ }
+ EXPECT_EQ(list_files.size(), 4);
+ // before delete tmp rowset, segment file and idx v2 exist
+ EXPECT_TRUE(list_files.contains("data/10000/1_0.dat"));
+ EXPECT_TRUE(list_files.contains("data/10000/1_0.idx"));
+ EXPECT_TRUE(list_files.contains("data/20000/2_0.dat"));
+ EXPECT_TRUE(list_files.contains("data/20000/2_0.idx"));
+
+ EXPECT_EQ(-1, recycler.delete_rowset_data(rowset_pbs,
RowsetRecyclingState::TMP_ROWSET));
+ list_files.clear();
+ iter.reset();
+ EXPECT_EQ(accessor->list_all(&iter), 0);
+ EXPECT_TRUE(iter->has_next());
+ for (auto file = iter->next(); file.has_value(); file = iter->next()) {
+ list_files.insert(file->path);
+ }
+ EXPECT_TRUE(list_files.contains("data/20000/2_0.dat"));
+ EXPECT_TRUE(list_files.contains("data/20000/2_0.idx"));
+ // after delete tmp rowset, for valit resource id rowset, both file
and idx v2 are removed
+ EXPECT_EQ(list_files.size(), 2);
+ }
+}
+
} // namespace doris::cloud
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]