This is an automated email from the ASF dual-hosted git repository.

airborne pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 12ba87a8d05 branch-3.0: [fix](recycler) fix idx file leak in recycler 
#44908 (#44966)
12ba87a8d05 is described below

commit 12ba87a8d0579a77a3e930ae6cc280252baec7f1
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Dec 4 14:08:16 2024 +0800

    branch-3.0: [fix](recycler) fix idx file leak in recycler #44908 (#44966)
    
    Cherry-picked from #44908
    
    Co-authored-by: airborne12 <[email protected]>
---
 cloud/src/recycler/recycler.cpp |  28 ++++++-----
 cloud/test/recycler_test.cpp    | 105 ++++++++++++++++++++++++++++++++++++++++
 2 files changed, 122 insertions(+), 11 deletions(-)

diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp
index 1b21ec68916..f7000ea3792 100644
--- a/cloud/src/recycler/recycler.cpp
+++ b/cloud/src/recycler/recycler.cpp
@@ -417,8 +417,12 @@ public:
             LOG(WARNING) << "malformed schema value, key=" << hex(schema_key);
             return -1;
         }
-        if (schema.index_size() > 0 && 
schema.has_inverted_index_storage_format()) {
-            res.first = schema.inverted_index_storage_format();
+        if (schema.index_size() > 0) {
+            InvertedIndexStorageFormatPB index_format = 
InvertedIndexStorageFormatPB::V1;
+            if (schema.has_inverted_index_storage_format()) {
+                index_format = schema.inverted_index_storage_format();
+            }
+            res.first = index_format;
             res.second.reserve(schema.index_size());
             for (auto& i : schema.index()) {
                 if (i.has_index_type() && i.index_type() == 
IndexType::INVERTED) {
@@ -1382,17 +1386,19 @@ int InstanceRecycler::delete_rowset_data(const 
doris::RowsetMetaCloudPB& rs_meta
     }
     std::vector<std::string> file_paths;
     auto tablet_schema = rs_meta_pb.tablet_schema();
+    auto index_storage_format = InvertedIndexStorageFormatPB::V1;
     for (int64_t i = 0; i < num_segments; ++i) {
         file_paths.push_back(segment_path(tablet_id, rowset_id, i));
         if (tablet_schema.has_inverted_index_storage_format()) {
-            if (tablet_schema.inverted_index_storage_format() == 
InvertedIndexStorageFormatPB::V1) {
-                for (const auto& index_id : index_ids) {
-                    file_paths.push_back(inverted_index_path_v1(tablet_id, 
rowset_id, i,
-                                                                
index_id.first, index_id.second));
-                }
-            } else if (!index_ids.empty()) {
-                file_paths.push_back(inverted_index_path_v2(tablet_id, 
rowset_id, i));
+            index_storage_format = 
tablet_schema.inverted_index_storage_format();
+        }
+        if (index_storage_format == InvertedIndexStorageFormatPB::V1) {
+            for (const auto& index_id : index_ids) {
+                file_paths.push_back(inverted_index_path_v1(tablet_id, 
rowset_id, i, index_id.first,
+                                                            index_id.second));
             }
+        } else if (!index_ids.empty()) {
+            file_paths.push_back(inverted_index_path_v2(tablet_id, rowset_id, 
i));
         }
     }
     // TODO(AlexYue): seems could do do batch
@@ -1429,8 +1435,8 @@ int InstanceRecycler::delete_rowset_data(const 
std::vector<doris::RowsetMetaClou
 
         // Process inverted indexes
         std::vector<std::pair<int64_t, std::string>> index_ids;
-        // default format as v2.
-        InvertedIndexStorageFormatPB index_format = 
InvertedIndexStorageFormatPB::V2;
+        // default format as v1.
+        InvertedIndexStorageFormatPB index_format = 
InvertedIndexStorageFormatPB::V1;
 
         if (rs.has_tablet_schema()) {
             for (const auto& index : rs.tablet_schema().index()) {
diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp
index feecf9552f9..0bc16644a82 100644
--- a/cloud/test/recycler_test.cpp
+++ b/cloud/test/recycler_test.cpp
@@ -3077,4 +3077,109 @@ TEST(RecyclerTest, delete_rowset_data) {
     }
 }
 
+TEST(RecyclerTest, delete_rowset_data_without_inverted_index_storage_format) {
+    auto txn_kv = std::make_shared<MemTxnKv>();
+    ASSERT_EQ(txn_kv->init(), 0);
+
+    InstanceInfoPB instance;
+    instance.set_instance_id(instance_id);
+    auto obj_info = instance.add_obj_info();
+    obj_info->set_id("recycle_tmp_rowsets");
+    obj_info->set_ak(config::test_s3_ak);
+    obj_info->set_sk(config::test_s3_sk);
+    obj_info->set_endpoint(config::test_s3_endpoint);
+    obj_info->set_region(config::test_s3_region);
+    obj_info->set_bucket(config::test_s3_bucket);
+    obj_info->set_prefix("recycle_tmp_rowsets");
+
+    std::vector<doris::TabletSchemaCloudPB> schemas;
+    for (int i = 0; i < 5; ++i) {
+        auto& schema = schemas.emplace_back();
+        schema.set_schema_version(i);
+        
//schema.set_inverted_index_storage_format(InvertedIndexStorageFormatPB::V1);
+        for (int j = 0; j < i; ++j) {
+            auto index = schema.add_index();
+            index->set_index_id(j);
+            index->set_index_type(IndexType::INVERTED);
+        }
+    }
+
+    {
+        InstanceRecycler recycler(txn_kv, instance, thread_group,
+                                  std::make_shared<TxnLazyCommitter>(txn_kv));
+        ASSERT_EQ(recycler.init(), 0);
+        auto accessor = recycler.accessor_map_.begin()->second;
+        int64_t txn_id_base = 114115;
+        int64_t tablet_id_base = 10015;
+        int64_t index_id_base = 1000;
+        // Delete each rowset directly using one RowsetPB
+        for (int i = 0; i < 100; ++i) {
+            int64_t txn_id = txn_id_base + i;
+            for (int j = 0; j < 20; ++j) {
+                auto rowset = create_rowset("recycle_tmp_rowsets", 
tablet_id_base + j,
+                                            index_id_base + j % 4, 5, 
schemas[i % 5], txn_id);
+                create_tmp_rowset(txn_kv.get(), accessor.get(), rowset, i & 1);
+                ASSERT_EQ(0, recycler.delete_rowset_data(rowset));
+            }
+        }
+
+        std::unique_ptr<ListIterator> list_iter;
+        ASSERT_EQ(0, accessor->list_all(&list_iter));
+        ASSERT_FALSE(list_iter->has_next());
+    }
+    {
+        InstanceInfoPB tmp_instance;
+        std::string resource_id = "recycle_tmp_rowsets";
+        tmp_instance.set_instance_id(instance_id);
+        auto tmp_obj_info = tmp_instance.add_obj_info();
+        tmp_obj_info->set_id(resource_id);
+        tmp_obj_info->set_ak(config::test_s3_ak);
+        tmp_obj_info->set_sk(config::test_s3_sk);
+        tmp_obj_info->set_endpoint(config::test_s3_endpoint);
+        tmp_obj_info->set_region(config::test_s3_region);
+        tmp_obj_info->set_bucket(config::test_s3_bucket);
+        tmp_obj_info->set_prefix(resource_id);
+
+        InstanceRecycler recycler(txn_kv, tmp_instance, thread_group,
+                                  std::make_shared<TxnLazyCommitter>(txn_kv));
+        ASSERT_EQ(recycler.init(), 0);
+        auto accessor = recycler.accessor_map_.begin()->second;
+        // Delete multiple rowset files using one series of RowsetPB
+        constexpr int index_id = 10001, tablet_id = 10002;
+        std::vector<doris::RowsetMetaCloudPB> rowset_pbs;
+        for (int i = 0; i < 10; ++i) {
+            auto rowset = create_rowset(resource_id, tablet_id, index_id, 5, 
schemas[i % 5]);
+            create_recycle_rowset(
+                    txn_kv.get(), accessor.get(), rowset,
+                    static_cast<RecycleRowsetPB::Type>(i % 
(RecycleRowsetPB::Type_MAX + 1)), true);
+
+            rowset_pbs.emplace_back(std::move(rowset));
+        }
+        ASSERT_EQ(0, recycler.delete_rowset_data(rowset_pbs));
+        std::unique_ptr<ListIterator> list_iter;
+        ASSERT_EQ(0, accessor->list_all(&list_iter));
+        ASSERT_FALSE(list_iter->has_next());
+    }
+    {
+        InstanceRecycler recycler(txn_kv, instance, thread_group,
+                                  std::make_shared<TxnLazyCommitter>(txn_kv));
+        ASSERT_EQ(recycler.init(), 0);
+        auto accessor = recycler.accessor_map_.begin()->second;
+        // Delete multiple rowset files using one series of RowsetPB
+        constexpr int index_id = 20001, tablet_id = 20002;
+        // Delete each rowset file directly using it's id to construct one path
+        for (int i = 0; i < 1000; ++i) {
+            auto rowset =
+                    create_rowset("recycle_tmp_rowsets", tablet_id, index_id, 
5, schemas[i % 5]);
+            create_recycle_rowset(txn_kv.get(), accessor.get(), rowset, 
RecycleRowsetPB::COMPACT,
+                                  true);
+            ASSERT_EQ(0, recycler.delete_rowset_data(rowset.resource_id(), 
rowset.tablet_id(),
+                                                     rowset.rowset_id_v2()));
+        }
+        std::unique_ptr<ListIterator> list_iter;
+        ASSERT_EQ(0, accessor->list_all(&list_iter));
+        ASSERT_FALSE(list_iter->has_next());
+    }
+}
+
 } // namespace doris::cloud


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to