This is an automated email from the ASF dual-hosted git repository.
airborne pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 12ba87a8d05 branch-3.0: [fix](recycler) fix idx file leak in recycler
#44908 (#44966)
12ba87a8d05 is described below
commit 12ba87a8d0579a77a3e930ae6cc280252baec7f1
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Dec 4 14:08:16 2024 +0800
branch-3.0: [fix](recycler) fix idx file leak in recycler #44908 (#44966)
Cherry-picked from #44908
Co-authored-by: airborne12 <[email protected]>
---
cloud/src/recycler/recycler.cpp | 28 ++++++-----
cloud/test/recycler_test.cpp | 105 ++++++++++++++++++++++++++++++++++++++++
2 files changed, 122 insertions(+), 11 deletions(-)
diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp
index 1b21ec68916..f7000ea3792 100644
--- a/cloud/src/recycler/recycler.cpp
+++ b/cloud/src/recycler/recycler.cpp
@@ -417,8 +417,12 @@ public:
LOG(WARNING) << "malformed schema value, key=" << hex(schema_key);
return -1;
}
- if (schema.index_size() > 0 &&
schema.has_inverted_index_storage_format()) {
- res.first = schema.inverted_index_storage_format();
+ if (schema.index_size() > 0) {
+ InvertedIndexStorageFormatPB index_format =
InvertedIndexStorageFormatPB::V1;
+ if (schema.has_inverted_index_storage_format()) {
+ index_format = schema.inverted_index_storage_format();
+ }
+ res.first = index_format;
res.second.reserve(schema.index_size());
for (auto& i : schema.index()) {
if (i.has_index_type() && i.index_type() ==
IndexType::INVERTED) {
@@ -1382,17 +1386,19 @@ int InstanceRecycler::delete_rowset_data(const
doris::RowsetMetaCloudPB& rs_meta
}
std::vector<std::string> file_paths;
auto tablet_schema = rs_meta_pb.tablet_schema();
+ auto index_storage_format = InvertedIndexStorageFormatPB::V1;
for (int64_t i = 0; i < num_segments; ++i) {
file_paths.push_back(segment_path(tablet_id, rowset_id, i));
if (tablet_schema.has_inverted_index_storage_format()) {
- if (tablet_schema.inverted_index_storage_format() ==
InvertedIndexStorageFormatPB::V1) {
- for (const auto& index_id : index_ids) {
- file_paths.push_back(inverted_index_path_v1(tablet_id,
rowset_id, i,
-
index_id.first, index_id.second));
- }
- } else if (!index_ids.empty()) {
- file_paths.push_back(inverted_index_path_v2(tablet_id,
rowset_id, i));
+ index_storage_format =
tablet_schema.inverted_index_storage_format();
+ }
+ if (index_storage_format == InvertedIndexStorageFormatPB::V1) {
+ for (const auto& index_id : index_ids) {
+ file_paths.push_back(inverted_index_path_v1(tablet_id,
rowset_id, i, index_id.first,
+ index_id.second));
}
+ } else if (!index_ids.empty()) {
+ file_paths.push_back(inverted_index_path_v2(tablet_id, rowset_id,
i));
}
}
// TODO(AlexYue): seems could do do batch
@@ -1429,8 +1435,8 @@ int InstanceRecycler::delete_rowset_data(const
std::vector<doris::RowsetMetaClou
// Process inverted indexes
std::vector<std::pair<int64_t, std::string>> index_ids;
- // default format as v2.
- InvertedIndexStorageFormatPB index_format =
InvertedIndexStorageFormatPB::V2;
+ // default format as v1.
+ InvertedIndexStorageFormatPB index_format =
InvertedIndexStorageFormatPB::V1;
if (rs.has_tablet_schema()) {
for (const auto& index : rs.tablet_schema().index()) {
diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp
index feecf9552f9..0bc16644a82 100644
--- a/cloud/test/recycler_test.cpp
+++ b/cloud/test/recycler_test.cpp
@@ -3077,4 +3077,109 @@ TEST(RecyclerTest, delete_rowset_data) {
}
}
+TEST(RecyclerTest, delete_rowset_data_without_inverted_index_storage_format) {
+ auto txn_kv = std::make_shared<MemTxnKv>();
+ ASSERT_EQ(txn_kv->init(), 0);
+
+ InstanceInfoPB instance;
+ instance.set_instance_id(instance_id);
+ auto obj_info = instance.add_obj_info();
+ obj_info->set_id("recycle_tmp_rowsets");
+ obj_info->set_ak(config::test_s3_ak);
+ obj_info->set_sk(config::test_s3_sk);
+ obj_info->set_endpoint(config::test_s3_endpoint);
+ obj_info->set_region(config::test_s3_region);
+ obj_info->set_bucket(config::test_s3_bucket);
+ obj_info->set_prefix("recycle_tmp_rowsets");
+
+ std::vector<doris::TabletSchemaCloudPB> schemas;
+ for (int i = 0; i < 5; ++i) {
+ auto& schema = schemas.emplace_back();
+ schema.set_schema_version(i);
+
//schema.set_inverted_index_storage_format(InvertedIndexStorageFormatPB::V1);
+ for (int j = 0; j < i; ++j) {
+ auto index = schema.add_index();
+ index->set_index_id(j);
+ index->set_index_type(IndexType::INVERTED);
+ }
+ }
+
+ {
+ InstanceRecycler recycler(txn_kv, instance, thread_group,
+ std::make_shared<TxnLazyCommitter>(txn_kv));
+ ASSERT_EQ(recycler.init(), 0);
+ auto accessor = recycler.accessor_map_.begin()->second;
+ int64_t txn_id_base = 114115;
+ int64_t tablet_id_base = 10015;
+ int64_t index_id_base = 1000;
+ // Delete each rowset directly using one RowsetPB
+ for (int i = 0; i < 100; ++i) {
+ int64_t txn_id = txn_id_base + i;
+ for (int j = 0; j < 20; ++j) {
+ auto rowset = create_rowset("recycle_tmp_rowsets",
tablet_id_base + j,
+ index_id_base + j % 4, 5,
schemas[i % 5], txn_id);
+ create_tmp_rowset(txn_kv.get(), accessor.get(), rowset, i & 1);
+ ASSERT_EQ(0, recycler.delete_rowset_data(rowset));
+ }
+ }
+
+ std::unique_ptr<ListIterator> list_iter;
+ ASSERT_EQ(0, accessor->list_all(&list_iter));
+ ASSERT_FALSE(list_iter->has_next());
+ }
+ {
+ InstanceInfoPB tmp_instance;
+ std::string resource_id = "recycle_tmp_rowsets";
+ tmp_instance.set_instance_id(instance_id);
+ auto tmp_obj_info = tmp_instance.add_obj_info();
+ tmp_obj_info->set_id(resource_id);
+ tmp_obj_info->set_ak(config::test_s3_ak);
+ tmp_obj_info->set_sk(config::test_s3_sk);
+ tmp_obj_info->set_endpoint(config::test_s3_endpoint);
+ tmp_obj_info->set_region(config::test_s3_region);
+ tmp_obj_info->set_bucket(config::test_s3_bucket);
+ tmp_obj_info->set_prefix(resource_id);
+
+ InstanceRecycler recycler(txn_kv, tmp_instance, thread_group,
+ std::make_shared<TxnLazyCommitter>(txn_kv));
+ ASSERT_EQ(recycler.init(), 0);
+ auto accessor = recycler.accessor_map_.begin()->second;
+ // Delete multiple rowset files using one series of RowsetPB
+ constexpr int index_id = 10001, tablet_id = 10002;
+ std::vector<doris::RowsetMetaCloudPB> rowset_pbs;
+ for (int i = 0; i < 10; ++i) {
+ auto rowset = create_rowset(resource_id, tablet_id, index_id, 5,
schemas[i % 5]);
+ create_recycle_rowset(
+ txn_kv.get(), accessor.get(), rowset,
+ static_cast<RecycleRowsetPB::Type>(i %
(RecycleRowsetPB::Type_MAX + 1)), true);
+
+ rowset_pbs.emplace_back(std::move(rowset));
+ }
+ ASSERT_EQ(0, recycler.delete_rowset_data(rowset_pbs));
+ std::unique_ptr<ListIterator> list_iter;
+ ASSERT_EQ(0, accessor->list_all(&list_iter));
+ ASSERT_FALSE(list_iter->has_next());
+ }
+ {
+ InstanceRecycler recycler(txn_kv, instance, thread_group,
+ std::make_shared<TxnLazyCommitter>(txn_kv));
+ ASSERT_EQ(recycler.init(), 0);
+ auto accessor = recycler.accessor_map_.begin()->second;
+ // Delete multiple rowset files using one series of RowsetPB
+ constexpr int index_id = 20001, tablet_id = 20002;
+ // Delete each rowset file directly using it's id to construct one path
+ for (int i = 0; i < 1000; ++i) {
+ auto rowset =
+ create_rowset("recycle_tmp_rowsets", tablet_id, index_id,
5, schemas[i % 5]);
+ create_recycle_rowset(txn_kv.get(), accessor.get(), rowset,
RecycleRowsetPB::COMPACT,
+ true);
+ ASSERT_EQ(0, recycler.delete_rowset_data(rowset.resource_id(),
rowset.tablet_id(),
+ rowset.rowset_id_v2()));
+ }
+ std::unique_ptr<ListIterator> list_iter;
+ ASSERT_EQ(0, accessor->list_all(&list_iter));
+ ASSERT_FALSE(list_iter->has_next());
+ }
+}
+
} // namespace doris::cloud
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]