This is an automated email from the ASF dual-hosted git repository.

luwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 555e430d86e [fix](packed-file) improve packed file trailer tooling and 
recycler robustness (#58883)
555e430d86e is described below

commit 555e430d86ebda4ec5076a6dfa93263953c50cf5
Author: Luwei <[email protected]>
AuthorDate: Mon Dec 15 11:45:33 2025 +0800

    [fix](packed-file) improve packed file trailer tooling and recycler 
robustness (#58883)
    
    Debug & tooling: write/parse versioned PackedFileDebugInfoPB trailer,
    add packed_file_tool and unit tests for easier debugging
    Recycler retries: make packed file ref-count updates
    retryable/configurable to handle TXN_CONFLICTs
    Prevent leak on recycle timing: ensure rowsets with packed slices are
    processed even when the tablet is already marked recycled
    Trim logs: downgrade noisy state transition logs to VLOG_DEBUG while
    keeping upload completion details
---
 be/src/io/fs/packed_file_manager.cpp       |  64 +++--
 be/src/io/fs/packed_file_trailer.cpp       | 152 +++++++++++
 be/src/io/fs/packed_file_trailer.h         |  38 +++
 be/src/tools/CMakeLists.txt                |  29 ++
 be/src/tools/packed_file_tool.cpp          |  56 ++++
 be/test/io/fs/packed_file_manager_test.cpp |  18 +-
 be/test/io/packed_file_trailer_test.cpp    | 116 ++++++++
 cloud/src/common/config.h                  |   5 +
 cloud/src/recycler/recycler.cpp            | 417 ++++++++++++++++++-----------
 cloud/src/recycler/recycler.h              |   1 +
 cloud/test/recycler_test.cpp               |  87 ++++++
 gensrc/proto/cloud.proto                   |   6 +
 12 files changed, 799 insertions(+), 190 deletions(-)

diff --git a/be/src/io/fs/packed_file_manager.cpp 
b/be/src/io/fs/packed_file_manager.cpp
index 2d3bcbf5518..94afa6d2bfd 100644
--- a/be/src/io/fs/packed_file_manager.cpp
+++ b/be/src/io/fs/packed_file_manager.cpp
@@ -41,6 +41,7 @@
 #include "cloud/config.h"
 #include "common/config.h"
 #include "gen_cpp/cloud.pb.h"
+#include "io/fs/packed_file_trailer.h"
 #include "olap/storage_engine.h"
 #include "runtime/exec_env.h"
 #include "util/coding.h"
@@ -84,20 +85,25 @@ Status append_packed_info_trailer(FileWriter* writer, const 
std::string& packed_
                                      packed_file_path);
     }
 
-    std::string serialized_info;
-    if (!packed_file_info.SerializeToString(&serialized_info)) {
-        return Status::InternalError("Failed to serialize packed file info for 
{}",
+    cloud::PackedFileDebugInfoPB debug_pb;
+    debug_pb.mutable_packed_file_info()->CopyFrom(packed_file_info);
+
+    std::string serialized_debug_info;
+    if (!debug_pb.SerializeToString(&serialized_debug_info)) {
+        return Status::InternalError("Failed to serialize packed file debug 
info for {}",
                                      packed_file_path);
     }
 
-    if (serialized_info.size() > std::numeric_limits<uint32_t>::max()) {
-        return Status::InternalError("PackedFileInfoPB too large for {}", 
packed_file_path);
+    if (serialized_debug_info.size() >
+        std::numeric_limits<uint32_t>::max() - kPackedFileTrailerSuffixSize) {
+        return Status::InternalError("PackedFileDebugInfoPB too large for {}", 
packed_file_path);
     }
 
     std::string trailer;
-    trailer.reserve(serialized_info.size() + sizeof(uint32_t));
-    trailer.append(serialized_info);
-    put_fixed32_le(&trailer, static_cast<uint32_t>(serialized_info.size()));
+    trailer.reserve(serialized_debug_info.size() + 
kPackedFileTrailerSuffixSize);
+    trailer.append(serialized_debug_info);
+    put_fixed32_le(&trailer, 
static_cast<uint32_t>(serialized_debug_info.size()));
+    put_fixed32_le(&trailer, kPackedFileTrailerVersion);
 
     return writer->append(Slice(trailer));
 }
@@ -428,9 +434,9 @@ Status 
PackedFileManager::mark_current_packed_file_for_upload_locked(
                 sampler->take_sample();
             }
         }
-        LOG(INFO) << "Packed file " << current->packed_file_path
-                  << " transition ACTIVE->READY_TO_UPLOAD; active_to_ready_ms="
-                  << active_to_ready_ms;
+        VLOG_DEBUG << "Packed file " << current->packed_file_path
+                   << " transition ACTIVE->READY_TO_UPLOAD; 
active_to_ready_ms="
+                   << active_to_ready_ms;
     }
 
     // Move to uploading files list
@@ -538,10 +544,10 @@ void PackedFileManager::process_uploading_packed_files() {
                     sampler->take_sample();
                 }
             }
-            LOG(INFO) << "Packed file " << packed_file->packed_file_path
-                      << " transition READY_TO_UPLOAD->UPLOADING; "
-                         "ready_to_upload_ms="
-                      << duration_ms;
+            VLOG_DEBUG << "Packed file " << packed_file->packed_file_path
+                       << " transition READY_TO_UPLOAD->UPLOADING; "
+                          "ready_to_upload_ms="
+                       << duration_ms;
         }
     };
 
@@ -588,10 +594,30 @@ void PackedFileManager::process_uploading_packed_files() {
                 sampler->take_sample();
             }
         }
+        int64_t total_ms = -1;
+        if (packed_file->first_append_timestamp.has_value()) {
+            total_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
+                               now - *packed_file->first_append_timestamp)
+                               .count();
+        }
+        std::ostringstream slices_stream;
+        bool first_slice = true;
+        for (const auto& [small_file_path, index] : 
packed_file->slice_locations) {
+            if (!first_slice) {
+                slices_stream << "; ";
+            }
+            first_slice = false;
+            slices_stream << small_file_path << "(txn=" << index.txn_id
+                          << ", offset=" << index.offset << ", size=" << 
index.size << ")";
+        }
         LOG(INFO) << "Packed file " << packed_file->packed_file_path
-                  << " upload completed; active_to_ready_ms=" << 
active_to_ready_ms
+                  << " uploaded; slices=" << 
packed_file->slice_locations.size()
+                  << ", total_bytes=" << packed_file->total_size << ", 
slice_detail=["
+                  << slices_stream.str() << "]"
+                  << ", active_to_ready_ms=" << active_to_ready_ms
                   << ", ready_to_upload_ms=" << ready_to_upload_ms
-                  << ", uploading_to_uploaded_ms=" << uploading_to_uploaded_ms;
+                  << ", uploading_to_uploaded_ms=" << uploading_to_uploaded_ms
+                  << ", total_ms=" << total_ms;
         {
             std::lock_guard<std::mutex> upload_lock(packed_file->upload_mutex);
             packed_file->state = PackedFileState::UPLOADED;
@@ -684,9 +710,9 @@ void PackedFileManager::process_uploading_packed_files() {
                 oss << "[" << small_file_path << ", offset=" << index.offset
                     << ", size=" << index.size << "]";
             }
-            LOG(INFO) << oss.str();
+            VLOG_DEBUG << oss.str();
         } else {
-            LOG(INFO) << "Uploading packed file " << packed_file_path << " 
with no small files";
+            VLOG_DEBUG << "Uploading packed file " << packed_file_path << " 
with no small files";
         }
 
         Status upload_status = 
finalize_packed_file_upload(packed_file->packed_file_path,
diff --git a/be/src/io/fs/packed_file_trailer.cpp 
b/be/src/io/fs/packed_file_trailer.cpp
new file mode 100644
index 00000000000..99a87b9c3da
--- /dev/null
+++ b/be/src/io/fs/packed_file_trailer.cpp
@@ -0,0 +1,152 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "io/fs/packed_file_trailer.h"
+
+#include <array>
+#include <fstream>
+
+#include "common/status.h"
+#include "util/coding.h"
+
+namespace doris::io {
+
+Status parse_packed_file_trailer(std::string_view data, 
cloud::PackedFileDebugInfoPB* debug_pb,
+                                 uint32_t* version) {
+    if (debug_pb == nullptr || version == nullptr) {
+        return Status::InvalidArgument("Output parameters must not be null");
+    }
+    if (data.size() < kPackedFileTrailerSuffixSize) {
+        return Status::InternalError("Packed file too small to contain 
trailer");
+    }
+
+    const size_t suffix_offset = data.size() - kPackedFileTrailerSuffixSize;
+    const auto* suffix_ptr = reinterpret_cast<const uint8_t*>(data.data() + 
suffix_offset);
+    const uint32_t trailer_size = decode_fixed32_le(suffix_ptr);
+    const uint32_t trailer_version = decode_fixed32_le(suffix_ptr + 
sizeof(uint32_t));
+
+    // Preferred format: [PackedFileDebugInfoPB][length][version]
+    if (trailer_size > 0 && trailer_size <= data.size() - 
kPackedFileTrailerSuffixSize) {
+        const size_t payload_offset = data.size() - 
kPackedFileTrailerSuffixSize - trailer_size;
+        std::string_view payload(data.data() + payload_offset, trailer_size);
+        if (payload.size() > 
static_cast<size_t>(std::numeric_limits<int>::max())) {
+            return Status::InternalError("Packed file trailer payload too 
large");
+        }
+        cloud::PackedFileDebugInfoPB parsed_pb;
+        if (parsed_pb.ParseFromArray(payload.data(), 
static_cast<int>(payload.size()))) {
+            debug_pb->Swap(&parsed_pb);
+            *version = trailer_version;
+            return Status::OK();
+        }
+    }
+
+    // Legacy format fallback: [PackedFileInfoPB][length]
+    if (data.size() < sizeof(uint32_t)) {
+        return Status::InternalError("Packed file trailer corrupted");
+    }
+    const size_t legacy_suffix_offset = data.size() - sizeof(uint32_t);
+    const auto* legacy_ptr = reinterpret_cast<const uint8_t*>(data.data() + 
legacy_suffix_offset);
+    const uint32_t legacy_size = decode_fixed32_le(legacy_ptr);
+    if (legacy_size == 0 || legacy_size > data.size() - sizeof(uint32_t)) {
+        return Status::InternalError("Packed file trailer corrupted");
+    }
+    const size_t legacy_payload_offset = data.size() - sizeof(uint32_t) - 
legacy_size;
+    std::string_view legacy_payload(data.data() + legacy_payload_offset, 
legacy_size);
+    cloud::PackedFileInfoPB packed_info;
+    if (legacy_payload.size() > 
static_cast<size_t>(std::numeric_limits<int>::max())) {
+        return Status::InternalError("Packed file legacy trailer payload too 
large");
+    }
+    if (!packed_info.ParseFromArray(legacy_payload.data(),
+                                    static_cast<int>(legacy_payload.size()))) {
+        return Status::InternalError("Failed to parse packed file trailer");
+    }
+    debug_pb->Clear();
+    debug_pb->mutable_packed_file_info()->Swap(&packed_info);
+    *version = 0;
+    return Status::OK();
+}
+
+Status read_packed_file_trailer(const std::string& file_path,
+                                cloud::PackedFileDebugInfoPB* debug_pb, 
uint32_t* version) {
+    if (debug_pb == nullptr || version == nullptr) {
+        return Status::InvalidArgument("Output parameters must not be null");
+    }
+
+    std::ifstream file(file_path, std::ios::binary);
+    if (!file.is_open()) {
+        return Status::IOError("Failed to open packed file {}", file_path);
+    }
+
+    file.seekg(0, std::ios::end);
+    const std::streamoff file_size = file.tellg();
+    if (file_size < static_cast<std::streamoff>(sizeof(uint32_t))) {
+        return Status::InternalError("Packed file {} is too small", file_path);
+    }
+
+    auto read_tail = [&](std::streamoff count, std::string* out) -> Status {
+        out->assign(static_cast<size_t>(count), '\0');
+        file.seekg(file_size - count);
+        file.read(out->data(), count);
+        if (!file) {
+            return Status::IOError("Failed to read last {} bytes from {}", 
count, file_path);
+        }
+        return Status::OK();
+    };
+
+    // Try new format first.
+    if (file_size >= 
static_cast<std::streamoff>(kPackedFileTrailerSuffixSize)) {
+        std::array<char, kPackedFileTrailerSuffixSize> suffix {};
+        file.seekg(file_size - static_cast<std::streamoff>(suffix.size()));
+        file.read(suffix.data(), suffix.size());
+        if (file) {
+            const uint32_t trailer_size =
+                    
decode_fixed32_le(reinterpret_cast<uint8_t*>(suffix.data()));
+            const uint32_t trailer_version =
+                    
decode_fixed32_le(reinterpret_cast<uint8_t*>(suffix.data()) + sizeof(uint32_t));
+            const std::streamoff required =
+                    static_cast<std::streamoff>(kPackedFileTrailerSuffixSize + 
trailer_size);
+            if (trailer_size > 0 && file_size >= required) {
+                std::string tail;
+                RETURN_IF_ERROR(read_tail(required, &tail));
+                Status st = parse_packed_file_trailer(tail, debug_pb, version);
+                if (st.ok() && *version == trailer_version) {
+                    return st;
+                }
+            }
+        }
+        file.clear();
+    }
+
+    // Legacy fallback: PackedFileInfoPB + length.
+    std::array<char, sizeof(uint32_t)> legacy_suffix {};
+    file.seekg(file_size - static_cast<std::streamoff>(legacy_suffix.size()));
+    file.read(legacy_suffix.data(), legacy_suffix.size());
+    if (!file) {
+        return Status::IOError("Failed to read legacy trailer length from {}", 
file_path);
+    }
+    const uint32_t legacy_size =
+            
decode_fixed32_le(reinterpret_cast<uint8_t*>(legacy_suffix.data()));
+    const std::streamoff required = 
static_cast<std::streamoff>(sizeof(uint32_t) + legacy_size);
+    if (legacy_size == 0 || file_size < required) {
+        return Status::InternalError("Packed file trailer corrupted for {}", 
file_path);
+    }
+    std::string tail;
+    RETURN_IF_ERROR(read_tail(required, &tail));
+    return parse_packed_file_trailer(tail, debug_pb, version);
+}
+
+} // namespace doris::io
diff --git a/be/src/io/fs/packed_file_trailer.h 
b/be/src/io/fs/packed_file_trailer.h
new file mode 100644
index 00000000000..21954b5507c
--- /dev/null
+++ b/be/src/io/fs/packed_file_trailer.h
@@ -0,0 +1,38 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include <string>
+#include <string_view>
+
+#include "common/status.h"
+#include "gen_cpp/cloud.pb.h"
+
+namespace doris::io {
+
+constexpr uint32_t kPackedFileTrailerVersion = 1;
+constexpr size_t kPackedFileTrailerSuffixSize = sizeof(uint32_t) * 2;
+
+Status parse_packed_file_trailer(std::string_view data, 
cloud::PackedFileDebugInfoPB* debug_pb,
+                                 uint32_t* version);
+
+Status read_packed_file_trailer(const std::string& file_path,
+                                cloud::PackedFileDebugInfoPB* debug_pb, 
uint32_t* version);
+
+} // namespace doris::io
diff --git a/be/src/tools/CMakeLists.txt b/be/src/tools/CMakeLists.txt
index 93e18980bb5..ab1fc5fba19 100644
--- a/be/src/tools/CMakeLists.txt
+++ b/be/src/tools/CMakeLists.txt
@@ -53,3 +53,32 @@ add_custom_command(TARGET meta_tool POST_BUILD
     COMMAND ${CMAKE_OBJCOPY} --add-gnu-debuglink=$<TARGET_FILE:meta_tool>.dbg 
$<TARGET_FILE:meta_tool>
     )
 endif()
+
+add_executable(packed_file_tool
+    packed_file_tool.cpp
+)
+
+pch_reuse(packed_file_tool)
+
+set_target_properties(packed_file_tool PROPERTIES ENABLE_EXPORTS 1)
+
+if (COMPILER_CLANG)
+    target_compile_options(packed_file_tool PRIVATE
+    -Wno-implicit-int-conversion
+    -Wno-shorten-64-to-32
+    )
+endif()
+
+target_link_libraries(packed_file_tool
+    ${DORIS_LINK_LIBS}
+)
+
+install(TARGETS packed_file_tool DESTINATION ${OUTPUT_DIR}/lib/)
+
+if (NOT OS_MACOSX)
+add_custom_command(TARGET packed_file_tool POST_BUILD
+    COMMAND ${CMAKE_OBJCOPY} --only-keep-debug $<TARGET_FILE:packed_file_tool> 
$<TARGET_FILE:packed_file_tool>.dbg
+    COMMAND ${CMAKE_STRIP} --strip-debug --strip-unneeded 
$<TARGET_FILE:packed_file_tool>
+    COMMAND ${CMAKE_OBJCOPY} 
--add-gnu-debuglink=$<TARGET_FILE:packed_file_tool>.dbg 
$<TARGET_FILE:packed_file_tool>
+    )
+endif()
diff --git a/be/src/tools/packed_file_tool.cpp 
b/be/src/tools/packed_file_tool.cpp
new file mode 100644
index 00000000000..0d77d94b9e3
--- /dev/null
+++ b/be/src/tools/packed_file_tool.cpp
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gen_cpp/cloud.pb.h>
+#include <gflags/gflags.h>
+
+#include <iostream>
+#include <string>
+
+#include "common/status.h"
+#include "io/fs/packed_file_trailer.h"
+#include "json2pb/pb_to_json.h"
+
+DEFINE_string(file, "", "Path to a local merge/packed file");
+
+int main(int argc, char** argv) {
+    google::SetUsageMessage(
+            "Dump packed file trailer for debugging.\n"
+            "Usage: packed_file_tool --file=/path/to/merge_file");
+    google::ParseCommandLineFlags(&argc, &argv, true);
+
+    if (FLAGS_file.empty()) {
+        std::cerr << "Flag --file is required\n";
+        return -1;
+    }
+
+    doris::cloud::PackedFileDebugInfoPB debug_info;
+    uint32_t version = 0;
+    doris::Status st = doris::io::read_packed_file_trailer(FLAGS_file, 
&debug_info, &version);
+    if (!st.ok()) {
+        std::cerr << "Failed to read packed file trailer: " << st.to_string() 
<< std::endl;
+        return -1;
+    }
+
+    json2pb::Pb2JsonOptions options;
+    options.pretty_json = true;
+    std::string json;
+    json2pb::ProtoMessageToJson(debug_info, &json, options);
+
+    std::cout << "trailer_version: " << version << "\n" << json << std::endl;
+    return 0;
+}
diff --git a/be/test/io/fs/packed_file_manager_test.cpp 
b/be/test/io/fs/packed_file_manager_test.cpp
index 38b708ae36d..a99ee158ad0 100644
--- a/be/test/io/fs/packed_file_manager_test.cpp
+++ b/be/test/io/fs/packed_file_manager_test.cpp
@@ -38,6 +38,7 @@
 #include "cpp/sync_point.h"
 #include "io/fs/file_system.h"
 #include "io/fs/file_writer.h"
+#include "io/fs/packed_file_trailer.h"
 #include "io/fs/path.h"
 #include "util/coding.h"
 #include "util/slice.h"
@@ -607,15 +608,14 @@ TEST_F(PackedFileManagerTest, 
AppendPackedFileInfoToFileTail) {
     ASSERT_NE(writer, nullptr);
 
     const auto& data = writer->written_data();
-    ASSERT_GE(data.size(), payload.size() + sizeof(uint32_t));
-    uint32_t trailer_size = decode_fixed32_le(
-            reinterpret_cast<const uint8_t*>(data.data() + data.size() - 
sizeof(uint32_t)));
-    ASSERT_GE(data.size(), payload.size() + sizeof(uint32_t) + trailer_size);
-
-    std::string serialized_info =
-            data.substr(data.size() - sizeof(uint32_t) - trailer_size, 
trailer_size);
-    cloud::PackedFileInfoPB parsed_info;
-    ASSERT_TRUE(parsed_info.ParseFromString(serialized_info));
+    cloud::PackedFileDebugInfoPB parsed_debug;
+    uint32_t version = 0;
+    auto st = parse_packed_file_trailer(data, &parsed_debug, &version);
+    ASSERT_TRUE(st.ok()) << st;
+    ASSERT_EQ(version, kPackedFileTrailerVersion);
+    ASSERT_TRUE(parsed_debug.has_packed_file_info());
+
+    const auto& parsed_info = parsed_debug.packed_file_info();
     ASSERT_EQ(parsed_info.slices_size(), 1);
     EXPECT_EQ(parsed_info.slices(0).path(), "trailer_path");
     EXPECT_EQ(parsed_info.slices(0).offset(), 0);
diff --git a/be/test/io/packed_file_trailer_test.cpp 
b/be/test/io/packed_file_trailer_test.cpp
new file mode 100644
index 00000000000..0028a237095
--- /dev/null
+++ b/be/test/io/packed_file_trailer_test.cpp
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "io/fs/packed_file_trailer.h"
+
+#include <gtest/gtest.h>
+#include <unistd.h>
+
+#include <filesystem>
+#include <fstream>
+#include <string>
+
+#include "common/status.h"
+#include "util/coding.h"
+
+namespace doris::io {
+
+using doris::Status;
+
+namespace {
+std::string unique_temp_file() {
+    auto dir = std::filesystem::temp_directory_path();
+    std::string pattern = (dir / "packed_file_XXXXXX").string();
+    int fd = mkstemp(pattern.data());
+    if (fd != -1) {
+        close(fd);
+    }
+    return pattern;
+}
+
+void write_file(const std::string& path, const std::string& data) {
+    std::ofstream ofs(path, std::ios::binary | std::ios::trunc);
+    ofs.write(data.data(), data.size());
+}
+} // namespace
+
+TEST(PackedFileTrailerTest, ReadNewFormatTrailer) {
+    cloud::PackedFileInfoPB info;
+    info.set_resource_id("resource");
+    info.set_total_slice_num(1);
+    auto* slice = info.add_slices();
+    slice->set_path("a");
+    slice->set_offset(10);
+    slice->set_size(20);
+
+    cloud::PackedFileDebugInfoPB debug_pb;
+    debug_pb.mutable_packed_file_info()->CopyFrom(info);
+
+    std::string serialized_debug;
+    ASSERT_TRUE(debug_pb.SerializeToString(&serialized_debug));
+
+    std::string file_content = "data";
+    file_content.append(serialized_debug);
+    put_fixed32_le(&file_content, 
static_cast<uint32_t>(serialized_debug.size()));
+    put_fixed32_le(&file_content, kPackedFileTrailerVersion);
+
+    auto path = unique_temp_file();
+    write_file(path, file_content);
+
+    cloud::PackedFileDebugInfoPB parsed;
+    uint32_t version = 0;
+    Status st = read_packed_file_trailer(path, &parsed, &version);
+    ASSERT_TRUE(st.ok()) << st;
+    EXPECT_EQ(version, kPackedFileTrailerVersion);
+    ASSERT_TRUE(parsed.has_packed_file_info());
+    EXPECT_EQ(parsed.packed_file_info().resource_id(), info.resource_id());
+    ASSERT_EQ(parsed.packed_file_info().slices_size(), 1);
+    EXPECT_EQ(parsed.packed_file_info().slices(0).path(), "a");
+    EXPECT_EQ(parsed.packed_file_info().slices(0).offset(), 10);
+    EXPECT_EQ(parsed.packed_file_info().slices(0).size(), 20);
+
+    std::filesystem::remove(path);
+}
+
+TEST(PackedFileTrailerTest, ReadLegacyTrailer) {
+    cloud::PackedFileInfoPB info;
+    info.set_total_slice_num(2);
+    info.set_remaining_slice_bytes(100);
+
+    std::string serialized_info;
+    ASSERT_TRUE(info.SerializeToString(&serialized_info));
+
+    std::string file_content = "legacy_payload";
+    file_content.append(serialized_info);
+    put_fixed32_le(&file_content, 
static_cast<uint32_t>(serialized_info.size()));
+
+    auto path = unique_temp_file();
+    write_file(path, file_content);
+
+    cloud::PackedFileDebugInfoPB parsed;
+    uint32_t version = 0;
+    Status st = read_packed_file_trailer(path, &parsed, &version);
+    ASSERT_TRUE(st.ok()) << st;
+    EXPECT_EQ(version, 0);
+    ASSERT_TRUE(parsed.has_packed_file_info());
+    EXPECT_EQ(parsed.packed_file_info().total_slice_num(), 
info.total_slice_num());
+    EXPECT_EQ(parsed.packed_file_info().remaining_slice_bytes(), 
info.remaining_slice_bytes());
+
+    std::filesystem::remove(path);
+}
+
+} // namespace doris::io
diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h
index f085ce04244..28c121eec9b 100644
--- a/cloud/src/common/config.h
+++ b/cloud/src/common/config.h
@@ -115,6 +115,11 @@ CONF_mInt64(check_recycle_task_interval_seconds, "600"); 
// 10min
 CONF_mInt64(recycler_sleep_before_scheduling_seconds, "60");
 // log a warning if a recycle task takes longer than this duration
 CONF_mInt64(recycle_task_threshold_seconds, "10800"); // 3h
+CONF_mInt32(decrement_packed_file_ref_counts_retry_times, "10");
+CONF_mInt32(packed_file_txn_retry_times, "10");
+// randomized interval to reduce conflict storms in FoundationDB, default 
5-50ms
+CONF_mInt64(packed_file_txn_retry_sleep_min_ms, "5");
+CONF_mInt64(packed_file_txn_retry_sleep_max_ms, "50");
 
 // force recycler to recycle all useless object.
 // **just for TEST**
diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp
index eabe17dce34..c9c6065722f 100644
--- a/cloud/src/recycler/recycler.cpp
+++ b/cloud/src/recycler/recycler.cpp
@@ -36,8 +36,10 @@
 #include <initializer_list>
 #include <memory>
 #include <numeric>
+#include <random>
 #include <string>
 #include <string_view>
+#include <thread>
 #include <unordered_map>
 #include <utility>
 #include <variant>
@@ -78,6 +80,22 @@ namespace doris::cloud {
 
 using namespace std::chrono;
 
+namespace {
+
+int64_t packed_file_retry_sleep_ms() {
+    const int64_t min_ms = std::max<int64_t>(0, 
config::packed_file_txn_retry_sleep_min_ms);
+    const int64_t max_ms = std::max<int64_t>(min_ms, 
config::packed_file_txn_retry_sleep_max_ms);
+    thread_local std::mt19937_64 gen(std::random_device {}());
+    std::uniform_int_distribution<int64_t> dist(min_ms, max_ms);
+    return dist(gen);
+}
+
+void sleep_for_packed_file_retry() {
+    
std::this_thread::sleep_for(std::chrono::milliseconds(packed_file_retry_sleep_ms()));
+}
+
+} // namespace
+
 // return 0 for success get a key, 1 for key not found, negative for error
 [[maybe_unused]] static int txn_get(TxnKv* txn_kv, std::string_view key, 
std::string& val) {
     std::unique_ptr<Transaction> txn;
@@ -1190,132 +1208,160 @@ int 
InstanceRecycler::correct_packed_file_info(cloud::PackedFileInfoPB* packed_i
 int InstanceRecycler::process_single_packed_file(const std::string& packed_key,
                                                  const std::string& 
packed_file_path,
                                                  PackedFileRecycleStats* 
stats) {
-    if (stopped()) {
-        LOG_WARNING("recycler stopped before processing packed file")
-                .tag("instance_id", instance_id_)
-                .tag("packed_file_path", packed_file_path);
-        return -1;
-    }
-
-    std::unique_ptr<Transaction> txn;
-    TxnErrorCode err = txn_kv_->create_txn(&txn);
-    if (err != TxnErrorCode::TXN_OK) {
-        LOG_WARNING("failed to create txn when processing packed file")
-                .tag("instance_id", instance_id_)
-                .tag("packed_file_path", packed_file_path)
-                .tag("err", err);
-        return -1;
-    }
+    const int max_retry_times = std::max(1, 
config::packed_file_txn_retry_times);
+    bool correction_ok = false;
+    cloud::PackedFileInfoPB packed_info;
 
-    std::string packed_val;
-    err = txn->get(packed_key, &packed_val);
-    if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
-        return 0;
-    }
-    if (err != TxnErrorCode::TXN_OK) {
-        LOG_WARNING("failed to get packed file kv")
-                .tag("instance_id", instance_id_)
-                .tag("packed_file_path", packed_file_path)
-                .tag("err", err);
-        return -1;
-    }
+    for (int attempt = 1; attempt <= max_retry_times; ++attempt) {
+        if (stopped()) {
+            LOG_WARNING("recycler stopped before processing packed file")
+                    .tag("instance_id", instance_id_)
+                    .tag("packed_file_path", packed_file_path)
+                    .tag("attempt", attempt);
+            return -1;
+        }
 
-    cloud::PackedFileInfoPB packed_info;
-    if (!packed_info.ParseFromString(packed_val)) {
-        LOG_WARNING("failed to parse packed file info")
-                .tag("instance_id", instance_id_)
-                .tag("packed_file_path", packed_file_path);
-        return -1;
-    }
+        std::unique_ptr<Transaction> txn;
+        TxnErrorCode err = txn_kv_->create_txn(&txn);
+        if (err != TxnErrorCode::TXN_OK) {
+            LOG_WARNING("failed to create txn when processing packed file")
+                    .tag("instance_id", instance_id_)
+                    .tag("packed_file_path", packed_file_path)
+                    .tag("attempt", attempt)
+                    .tag("err", err);
+            return -1;
+        }
 
-    int64_t now_sec = ::time(nullptr);
-    bool corrected = packed_info.corrected();
-    bool due =
-            config::force_immediate_recycle ||
-            now_sec - packed_info.created_at_sec() >= 
config::packed_file_correction_delay_seconds;
+        std::string packed_val;
+        err = txn->get(packed_key, &packed_val);
+        if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
+            return 0;
+        }
+        if (err != TxnErrorCode::TXN_OK) {
+            LOG_WARNING("failed to get packed file kv")
+                    .tag("instance_id", instance_id_)
+                    .tag("packed_file_path", packed_file_path)
+                    .tag("attempt", attempt)
+                    .tag("err", err);
+            return -1;
+        }
 
-    if (!corrected && due) {
-        bool changed = false;
-        if (correct_packed_file_info(&packed_info, &changed, packed_file_path, 
stats) != 0) {
-            LOG_WARNING("correct_packed_file_info failed")
+        if (!packed_info.ParseFromString(packed_val)) {
+            LOG_WARNING("failed to parse packed file info")
                     .tag("instance_id", instance_id_)
-                    .tag("packed_file_path", packed_file_path);
+                    .tag("packed_file_path", packed_file_path)
+                    .tag("attempt", attempt);
             return -1;
         }
-        if (changed) {
-            std::string updated;
-            if (!packed_info.SerializeToString(&updated)) {
-                LOG_WARNING("failed to serialize packed file info after 
correction")
+
+        int64_t now_sec = ::time(nullptr);
+        bool corrected = packed_info.corrected();
+        bool due = config::force_immediate_recycle ||
+                   now_sec - packed_info.created_at_sec() >=
+                           config::packed_file_correction_delay_seconds;
+
+        if (!corrected && due) {
+            bool changed = false;
+            if (correct_packed_file_info(&packed_info, &changed, 
packed_file_path, stats) != 0) {
+                LOG_WARNING("correct_packed_file_info failed")
                         .tag("instance_id", instance_id_)
-                        .tag("packed_file_path", packed_file_path);
+                        .tag("packed_file_path", packed_file_path)
+                        .tag("attempt", attempt);
                 return -1;
             }
-            txn->put(packed_key, updated);
-            err = txn->commit();
-            if (err == TxnErrorCode::TXN_OK) {
-                if (stats) {
-                    ++stats->num_corrected;
-                }
-            } else {
-                if (err == TxnErrorCode::TXN_CONFLICT) {
-                    LOG_WARNING("failed to commit correction for packed file 
due to conflict")
+            if (changed) {
+                std::string updated;
+                if (!packed_info.SerializeToString(&updated)) {
+                    LOG_WARNING("failed to serialize packed file info after 
correction")
                             .tag("instance_id", instance_id_)
-                            .tag("packed_file_path", packed_file_path);
+                            .tag("packed_file_path", packed_file_path)
+                            .tag("attempt", attempt);
+                    return -1;
+                }
+                txn->put(packed_key, updated);
+                err = txn->commit();
+                if (err == TxnErrorCode::TXN_OK) {
+                    if (stats) {
+                        ++stats->num_corrected;
+                    }
                 } else {
+                    if (err == TxnErrorCode::TXN_CONFLICT && attempt < 
max_retry_times) {
+                        LOG_WARNING(
+                                "failed to commit correction for packed file 
due to conflict, "
+                                "retrying")
+                                .tag("instance_id", instance_id_)
+                                .tag("packed_file_path", packed_file_path)
+                                .tag("attempt", attempt);
+                        sleep_for_packed_file_retry();
+                        packed_info.Clear();
+                        continue;
+                    }
                     LOG_WARNING("failed to commit correction for packed file")
                             .tag("instance_id", instance_id_)
                             .tag("packed_file_path", packed_file_path)
+                            .tag("attempt", attempt)
                             .tag("err", err);
+                    return -1;
                 }
-                return -1;
             }
         }
+
+        correction_ok = true;
+        break;
     }
 
-    txn.reset();
+    if (!correction_ok) {
+        return -1;
+    }
 
-    if (packed_info.state() == cloud::PackedFileInfoPB::RECYCLING && 
packed_info.ref_cnt() == 0) {
-        if (!packed_info.has_resource_id() || 
packed_info.resource_id().empty()) {
-            LOG_WARNING("packed file missing resource id when recycling")
-                    .tag("instance_id", instance_id_)
-                    .tag("packed_file_path", packed_file_path);
-            return -1;
-        }
-        auto [resource_id, accessor] = 
resolve_packed_file_accessor(packed_info.resource_id());
-        if (!accessor) {
-            LOG_WARNING("no accessor available to delete packed file")
-                    .tag("instance_id", instance_id_)
-                    .tag("packed_file_path", packed_file_path)
-                    .tag("resource_id", packed_info.resource_id());
-            return -1;
-        }
-        int del_ret = accessor->delete_file(packed_file_path);
-        if (del_ret != 0 && del_ret != 1) {
-            LOG_WARNING("failed to delete packed file")
-                    .tag("instance_id", instance_id_)
-                    .tag("packed_file_path", packed_file_path)
-                    .tag("resource_id", resource_id)
-                    .tag("ret", del_ret);
-            return -1;
-        }
-        if (del_ret == 1) {
-            LOG_INFO("packed file already removed")
-                    .tag("instance_id", instance_id_)
-                    .tag("packed_file_path", packed_file_path)
-                    .tag("resource_id", resource_id);
-        } else {
-            LOG_INFO("deleted packed file")
-                    .tag("instance_id", instance_id_)
-                    .tag("packed_file_path", packed_file_path)
-                    .tag("resource_id", resource_id);
-        }
+    if (!(packed_info.state() == cloud::PackedFileInfoPB::RECYCLING &&
+          packed_info.ref_cnt() == 0)) {
+        return 0;
+    }
 
+    if (!packed_info.has_resource_id() || packed_info.resource_id().empty()) {
+        LOG_WARNING("packed file missing resource id when recycling")
+                .tag("instance_id", instance_id_)
+                .tag("packed_file_path", packed_file_path);
+        return -1;
+    }
+    auto [resource_id, accessor] = 
resolve_packed_file_accessor(packed_info.resource_id());
+    if (!accessor) {
+        LOG_WARNING("no accessor available to delete packed file")
+                .tag("instance_id", instance_id_)
+                .tag("packed_file_path", packed_file_path)
+                .tag("resource_id", packed_info.resource_id());
+        return -1;
+    }
+    int del_ret = accessor->delete_file(packed_file_path);
+    if (del_ret != 0 && del_ret != 1) {
+        LOG_WARNING("failed to delete packed file")
+                .tag("instance_id", instance_id_)
+                .tag("packed_file_path", packed_file_path)
+                .tag("resource_id", resource_id)
+                .tag("ret", del_ret);
+        return -1;
+    }
+    if (del_ret == 1) {
+        LOG_INFO("packed file already removed")
+                .tag("instance_id", instance_id_)
+                .tag("packed_file_path", packed_file_path)
+                .tag("resource_id", resource_id);
+    } else {
+        LOG_INFO("deleted packed file")
+                .tag("instance_id", instance_id_)
+                .tag("packed_file_path", packed_file_path)
+                .tag("resource_id", resource_id);
+    }
+
+    for (int del_attempt = 1; del_attempt <= max_retry_times; ++del_attempt) {
         std::unique_ptr<Transaction> del_txn;
-        err = txn_kv_->create_txn(&del_txn);
+        TxnErrorCode err = txn_kv_->create_txn(&del_txn);
         if (err != TxnErrorCode::TXN_OK) {
             LOG_WARNING("failed to create txn when removing packed file kv")
                     .tag("instance_id", instance_id_)
                     .tag("packed_file_path", packed_file_path)
+                    .tag("del_attempt", del_attempt)
                     .tag("err", err);
             return -1;
         }
@@ -1329,6 +1375,7 @@ int InstanceRecycler::process_single_packed_file(const 
std::string& packed_key,
             LOG_WARNING("failed to re-read packed file kv before removal")
                     .tag("instance_id", instance_id_)
                     .tag("packed_file_path", packed_file_path)
+                    .tag("del_attempt", del_attempt)
                     .tag("err", err);
             return -1;
         }
@@ -1337,7 +1384,8 @@ int InstanceRecycler::process_single_packed_file(const 
std::string& packed_key,
         if (!latest_info.ParseFromString(latest_val)) {
             LOG_WARNING("failed to parse packed file info before removal")
                     .tag("instance_id", instance_id_)
-                    .tag("packed_file_path", packed_file_path);
+                    .tag("packed_file_path", packed_file_path)
+                    .tag("del_attempt", del_attempt);
             return -1;
         }
 
@@ -1345,7 +1393,8 @@ int InstanceRecycler::process_single_packed_file(const 
std::string& packed_key,
               latest_info.ref_cnt() == 0)) {
             LOG_INFO("packed file state changed before removal, skip deleting 
kv")
                     .tag("instance_id", instance_id_)
-                    .tag("packed_file_path", packed_file_path);
+                    .tag("packed_file_path", packed_file_path)
+                    .tag("del_attempt", del_attempt);
             return 0;
         }
 
@@ -1371,19 +1420,29 @@ int InstanceRecycler::process_single_packed_file(const 
std::string& packed_key,
             return 0;
         }
         if (err == TxnErrorCode::TXN_CONFLICT) {
-            LOG_WARNING("failed to remove packed file kv due to conflict")
+            if (del_attempt >= max_retry_times) {
+                LOG_WARNING("failed to remove packed file kv due to conflict 
after max retry")
+                        .tag("instance_id", instance_id_)
+                        .tag("packed_file_path", packed_file_path)
+                        .tag("del_attempt", del_attempt);
+                return -1;
+            }
+            LOG_WARNING("failed to remove packed file kv due to conflict, 
retrying")
                     .tag("instance_id", instance_id_)
-                    .tag("packed_file_path", packed_file_path);
-            return -1;
+                    .tag("packed_file_path", packed_file_path)
+                    .tag("del_attempt", del_attempt);
+            sleep_for_packed_file_retry();
+            continue;
         }
         LOG_WARNING("failed to remove packed file kv")
                 .tag("instance_id", instance_id_)
                 .tag("packed_file_path", packed_file_path)
+                .tag("del_attempt", del_attempt)
                 .tag("err", err);
         return -1;
     }
 
-    return 0;
+    return -1;
 }
 
 int InstanceRecycler::handle_packed_file_kv(std::string_view key, 
std::string_view /*value*/,
@@ -2603,6 +2662,7 @@ int 
InstanceRecycler::decrement_packed_file_ref_counts(const doris::RowsetMetaCl
         return 0;
     }
 
+    const int max_retry_times = std::max(1, 
config::decrement_packed_file_ref_counts_retry_times);
     int ret = 0;
     for (auto& [packed_file_path, small_files] : packed_file_updates) {
         if (small_files.empty()) {
@@ -2610,7 +2670,7 @@ int 
InstanceRecycler::decrement_packed_file_ref_counts(const doris::RowsetMetaCl
         }
 
         bool success = false;
-        do {
+        for (int attempt = 1; attempt <= max_retry_times; ++attempt) {
             std::unique_ptr<Transaction> txn;
             TxnErrorCode err = txn_kv_->create_txn(&txn);
             if (err != TxnErrorCode::TXN_OK) {
@@ -2763,14 +2823,26 @@ int 
InstanceRecycler::decrement_packed_file_ref_counts(const doris::RowsetMetaCl
                 break;
             }
             if (err == TxnErrorCode::TXN_CONFLICT) {
-                LOG_WARNING("packed file info update conflict, not retrying")
+                if (attempt >= max_retry_times) {
+                    LOG_WARNING("packed file info update conflict after max 
retry")
+                            .tag("instance_id", instance_id_)
+                            .tag("packed_file_path", packed_file_path)
+                            .tag("rowset_id", rs_meta_pb.rowset_id_v2())
+                            .tag("tablet_id", rs_meta_pb.tablet_id())
+                            .tag("changed_files", changed_files)
+                            .tag("attempt", attempt);
+                    ret = -1;
+                    break;
+                }
+                LOG_WARNING("packed file info update conflict, retrying")
                         .tag("instance_id", instance_id_)
                         .tag("packed_file_path", packed_file_path)
                         .tag("rowset_id", rs_meta_pb.rowset_id_v2())
                         .tag("tablet_id", rs_meta_pb.tablet_id())
-                        .tag("changed_files", changed_files);
-                ret = -1;
-                break;
+                        .tag("changed_files", changed_files)
+                        .tag("attempt", attempt);
+                sleep_for_packed_file_retry();
+                continue;
             }
 
             LOG_WARNING("failed to commit packed file info update")
@@ -2782,7 +2854,7 @@ int 
InstanceRecycler::decrement_packed_file_ref_counts(const doris::RowsetMetaCl
                     .tag("changed_files", changed_files);
             ret = -1;
             break;
-        } while (false);
+        }
 
         if (!success) {
             ret = -1;
@@ -2832,63 +2904,81 @@ int InstanceRecycler::delete_packed_file_and_kv(const 
std::string& packed_file_p
                 .tag("resource_id", resource_id);
     }
 
-    std::unique_ptr<Transaction> del_txn;
-    TxnErrorCode err = txn_kv_->create_txn(&del_txn);
-    if (err != TxnErrorCode::TXN_OK) {
-        LOG_WARNING("failed to create txn when removing packed file kv")
-                .tag("instance_id", instance_id_)
-                .tag("packed_file_path", packed_file_path)
-                .tag("err", err);
-        return -1;
-    }
+    const int max_retry_times = std::max(1, 
config::packed_file_txn_retry_times);
+    for (int attempt = 1; attempt <= max_retry_times; ++attempt) {
+        std::unique_ptr<Transaction> del_txn;
+        TxnErrorCode err = txn_kv_->create_txn(&del_txn);
+        if (err != TxnErrorCode::TXN_OK) {
+            LOG_WARNING("failed to create txn when removing packed file kv")
+                    .tag("instance_id", instance_id_)
+                    .tag("packed_file_path", packed_file_path)
+                    .tag("attempt", attempt)
+                    .tag("err", err);
+            return -1;
+        }
 
-    std::string latest_val;
-    err = del_txn->get(packed_key, &latest_val);
-    if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
-        return 0;
-    }
-    if (err != TxnErrorCode::TXN_OK) {
-        LOG_WARNING("failed to re-read packed file kv before removal")
-                .tag("instance_id", instance_id_)
-                .tag("packed_file_path", packed_file_path)
-                .tag("err", err);
-        return -1;
-    }
+        std::string latest_val;
+        err = del_txn->get(packed_key, &latest_val);
+        if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
+            return 0;
+        }
+        if (err != TxnErrorCode::TXN_OK) {
+            LOG_WARNING("failed to re-read packed file kv before removal")
+                    .tag("instance_id", instance_id_)
+                    .tag("packed_file_path", packed_file_path)
+                    .tag("attempt", attempt)
+                    .tag("err", err);
+            return -1;
+        }
 
-    cloud::PackedFileInfoPB latest_info;
-    if (!latest_info.ParseFromString(latest_val)) {
-        LOG_WARNING("failed to parse packed file info before removal")
-                .tag("instance_id", instance_id_)
-                .tag("packed_file_path", packed_file_path);
-        return -1;
-    }
+        cloud::PackedFileInfoPB latest_info;
+        if (!latest_info.ParseFromString(latest_val)) {
+            LOG_WARNING("failed to parse packed file info before removal")
+                    .tag("instance_id", instance_id_)
+                    .tag("packed_file_path", packed_file_path)
+                    .tag("attempt", attempt);
+            return -1;
+        }
 
-    if (!(latest_info.state() == cloud::PackedFileInfoPB::RECYCLING &&
-          latest_info.ref_cnt() == 0)) {
-        LOG_INFO("packed file state changed before removal, skip deleting kv")
-                .tag("instance_id", instance_id_)
-                .tag("packed_file_path", packed_file_path);
-        return 0;
-    }
+        if (!(latest_info.state() == cloud::PackedFileInfoPB::RECYCLING &&
+              latest_info.ref_cnt() == 0)) {
+            LOG_INFO("packed file state changed before removal, skip deleting 
kv")
+                    .tag("instance_id", instance_id_)
+                    .tag("packed_file_path", packed_file_path)
+                    .tag("attempt", attempt);
+            return 0;
+        }
 
-    del_txn->remove(packed_key);
-    err = del_txn->commit();
-    if (err == TxnErrorCode::TXN_OK) {
-        LOG_INFO("removed packed file metadata")
-                .tag("instance_id", instance_id_)
-                .tag("packed_file_path", packed_file_path);
-        return 0;
-    }
-    if (err == TxnErrorCode::TXN_CONFLICT) {
-        LOG_WARNING("failed to remove packed file kv due to conflict")
+        del_txn->remove(packed_key);
+        err = del_txn->commit();
+        if (err == TxnErrorCode::TXN_OK) {
+            LOG_INFO("removed packed file metadata")
+                    .tag("instance_id", instance_id_)
+                    .tag("packed_file_path", packed_file_path);
+            return 0;
+        }
+        if (err == TxnErrorCode::TXN_CONFLICT) {
+            if (attempt >= max_retry_times) {
+                LOG_WARNING("failed to remove packed file kv due to conflict 
after max retry")
+                        .tag("instance_id", instance_id_)
+                        .tag("packed_file_path", packed_file_path)
+                        .tag("attempt", attempt);
+                return -1;
+            }
+            LOG_WARNING("failed to remove packed file kv due to conflict, 
retrying")
+                    .tag("instance_id", instance_id_)
+                    .tag("packed_file_path", packed_file_path)
+                    .tag("attempt", attempt);
+            sleep_for_packed_file_retry();
+            continue;
+        }
+        LOG_WARNING("failed to remove packed file kv")
                 .tag("instance_id", instance_id_)
-                .tag("packed_file_path", packed_file_path);
+                .tag("packed_file_path", packed_file_path)
+                .tag("attempt", attempt)
+                .tag("err", err);
         return -1;
     }
-    LOG_WARNING("failed to remove packed file kv")
-            .tag("instance_id", instance_id_)
-            .tag("packed_file_path", packed_file_path)
-            .tag("err", err);
     return -1;
 }
 
@@ -2907,8 +2997,11 @@ int InstanceRecycler::delete_rowset_data(
         // due to aborted schema change.
         if (is_formal_rowset) {
             std::lock_guard lock(recycled_tablets_mtx_);
-            if (recycled_tablets_.count(rs.tablet_id())) {
-                continue; // Rowset data has already been deleted
+            if (recycled_tablets_.count(rs.tablet_id()) && 
rs.packed_slice_locations_size() == 0) {
+                // Tablet has been recycled and this rowset has no packed 
slices, so file data
+                // should already be gone; skip to avoid redundant deletes. 
Rowsets with packed
+                // slice info must still run to decrement packed file ref 
counts.
+                continue;
             }
         }
 
diff --git a/cloud/src/recycler/recycler.h b/cloud/src/recycler/recycler.h
index a9f2b2c662d..5cf87bcffe0 100644
--- a/cloud/src/recycler/recycler.h
+++ b/cloud/src/recycler/recycler.h
@@ -434,6 +434,7 @@ private:
 
     // return 0 for success otherwise error
     int decrement_packed_file_ref_counts(const doris::RowsetMetaCloudPB& 
rs_meta_pb);
+    friend class 
RecyclerTest_delete_rowset_data_packed_file_respects_recycled_tablet_Test;
 
     int delete_packed_file_and_kv(const std::string& packed_file_path,
                                   const std::string& packed_key,
diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp
index abb1d706998..a308a1cfef8 100644
--- a/cloud/test/recycler_test.cpp
+++ b/cloud/test/recycler_test.cpp
@@ -5555,6 +5555,93 @@ TEST(RecyclerTest, 
delete_rowset_data_packed_file_single_rowset) {
     EXPECT_EQ(1, accessor->exists(packed_file_path));
 }
 
+TEST(RecyclerTest, delete_rowset_data_packed_file_respects_recycled_tablet) {
+    auto txn_kv = std::make_shared<MemTxnKv>();
+    ASSERT_EQ(txn_kv->init(), 0);
+
+    constexpr std::string_view kResourceId = 
"packed_file_resource_recycled_tablet";
+    InstanceInfoPB instance;
+    instance.set_instance_id(instance_id);
+    auto obj_info = instance.add_obj_info();
+    obj_info->set_id(std::string(kResourceId));
+    obj_info->set_ak(config::test_s3_ak);
+    obj_info->set_sk(config::test_s3_sk);
+    obj_info->set_endpoint(config::test_s3_endpoint);
+    obj_info->set_region(config::test_s3_region);
+    obj_info->set_bucket(config::test_s3_bucket);
+    obj_info->set_prefix(std::string(kResourceId));
+
+    InstanceRecycler recycler(txn_kv, instance, thread_group,
+                              std::make_shared<TxnLazyCommitter>(txn_kv));
+    ASSERT_EQ(recycler.init(), 0);
+    auto accessor = recycler.accessor_map_.begin()->second;
+
+    doris::TabletSchemaCloudPB schema;
+    schema.set_schema_version(2);
+    schema.set_inverted_index_storage_format(InvertedIndexStorageFormatPB::V1);
+
+    auto rowset = create_rowset(std::string(kResourceId), 123456, 2001, 2, 
schema, 7201);
+    ASSERT_EQ(0, create_tmp_rowset(txn_kv.get(), accessor.get(), rowset, 
true));
+
+    auto merged_map = rowset.mutable_packed_slice_locations();
+    std::string packed_file_path =
+            fmt::format("merge/{}/{}_recycled.dat", rowset.tablet_id(), 
rowset.rowset_id_v2());
+    constexpr int64_t kSliceBytes = 256;
+    for (int i = 0; i < rowset.num_segments(); ++i) {
+        std::string small_path = segment_path(rowset.tablet_id(), 
rowset.rowset_id_v2(), i);
+        auto& index_pb = (*merged_map)[small_path];
+        index_pb.set_packed_file_path(packed_file_path);
+        index_pb.set_offset(i * kSliceBytes);
+        index_pb.set_size(kSliceBytes);
+    }
+
+    accessor->put_file(packed_file_path, "");
+    PackedFileInfoPB merged_info;
+    merged_info.set_ref_cnt(rowset.num_segments());
+    merged_info.set_total_slice_num(rowset.num_segments());
+    merged_info.set_total_slice_bytes(kSliceBytes * rowset.num_segments());
+    merged_info.set_remaining_slice_bytes(kSliceBytes * rowset.num_segments());
+    merged_info.set_state(PackedFileInfoPB::NORMAL);
+    merged_info.set_resource_id(std::string(kResourceId));
+    for (const auto& [small_path, index_pb] : *merged_map) {
+        auto* small_file = merged_info.add_slices();
+        small_file->set_path(small_path);
+        small_file->set_offset(index_pb.offset());
+        small_file->set_size(index_pb.size());
+        small_file->set_deleted(false);
+        small_file->set_txn_id(next_small_file_txn_id());
+        small_file->set_rowset_id(rowset.rowset_id_v2());
+        small_file->set_tablet_id(rowset.tablet_id());
+    }
+
+    std::string merged_key = packed_file_key({instance_id, packed_file_path});
+    {
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(TxnErrorCode::TXN_OK, txn_kv->create_txn(&txn));
+        std::string merged_val;
+        ASSERT_TRUE(merged_info.SerializeToString(&merged_val));
+        txn->put(merged_key, merged_val);
+        ASSERT_EQ(TxnErrorCode::TXN_OK, txn->commit());
+    }
+
+    // Simulate tablet already recycled; rowset with packed slices should 
still be processed.
+    {
+        std::lock_guard lock(recycler.recycled_tablets_mtx_);
+        recycler.recycled_tablets_.insert(rowset.tablet_id());
+    }
+
+    std::map<std::string, doris::RowsetMetaCloudPB> rowsets;
+    rowsets.emplace(rowset.rowset_id_v2(), rowset);
+    RecyclerMetricsContext metrics_ctx(instance_id, "delete_rowset_data");
+    ASSERT_EQ(0, recycler.delete_rowset_data(rowsets, 
RowsetRecyclingState::FORMAL_ROWSET,
+                                             metrics_ctx));
+
+    std::unique_ptr<Transaction> txn;
+    ASSERT_EQ(TxnErrorCode::TXN_OK, txn_kv->create_txn(&txn));
+    std::string updated_val;
+    EXPECT_EQ(TxnErrorCode::TXN_KEY_NOT_FOUND, txn->get(merged_key, 
&updated_val));
+}
+
 TEST(RecyclerTest, delete_rowset_data_packed_file_batch_rowsets) {
     auto txn_kv = std::make_shared<MemTxnKv>();
     ASSERT_EQ(txn_kv->init(), 0);
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index 30e3a5f4d46..8f502cf439f 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -2157,6 +2157,12 @@ message PackedFileInfoPB {
     optional string resource_id = 9;
 }
 
+// Wrapper for packed file debug information. It keeps PackedFileInfoPB 
extensible for tools
+// reading packed file trailers.
+message PackedFileDebugInfoPB {
+    optional PackedFileInfoPB packed_file_info = 1;
+}
+
 message UpdatePackedFileInfoRequest {
     optional string cloud_unique_id = 1; // For auth
     optional string request_ip = 2;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to