This is an automated email from the ASF dual-hosted git repository.
luwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 555e430d86e [fix](packed-file) improve packed file trailer tooling and
recycler robustness (#58883)
555e430d86e is described below
commit 555e430d86ebda4ec5076a6dfa93263953c50cf5
Author: Luwei <[email protected]>
AuthorDate: Mon Dec 15 11:45:33 2025 +0800
[fix](packed-file) improve packed file trailer tooling and recycler
robustness (#58883)
Debug & tooling: write/parse versioned PackedFileDebugInfoPB trailer,
add packed_file_tool and unit tests for easier debugging
Recycler retries: make packed file ref-count updates
retryable/configurable to handle TXN_CONFLICTs
Prevent leak on recycle timing: ensure rowsets with packed slices are
processed even when the tablet is already marked recycled
Trim logs: downgrade noisy state transition logs to VLOG_DEBUG while
keeping upload completion details
---
be/src/io/fs/packed_file_manager.cpp | 64 +++--
be/src/io/fs/packed_file_trailer.cpp | 152 +++++++++++
be/src/io/fs/packed_file_trailer.h | 38 +++
be/src/tools/CMakeLists.txt | 29 ++
be/src/tools/packed_file_tool.cpp | 56 ++++
be/test/io/fs/packed_file_manager_test.cpp | 18 +-
be/test/io/packed_file_trailer_test.cpp | 116 ++++++++
cloud/src/common/config.h | 5 +
cloud/src/recycler/recycler.cpp | 417 ++++++++++++++++++-----------
cloud/src/recycler/recycler.h | 1 +
cloud/test/recycler_test.cpp | 87 ++++++
gensrc/proto/cloud.proto | 6 +
12 files changed, 799 insertions(+), 190 deletions(-)
diff --git a/be/src/io/fs/packed_file_manager.cpp
b/be/src/io/fs/packed_file_manager.cpp
index 2d3bcbf5518..94afa6d2bfd 100644
--- a/be/src/io/fs/packed_file_manager.cpp
+++ b/be/src/io/fs/packed_file_manager.cpp
@@ -41,6 +41,7 @@
#include "cloud/config.h"
#include "common/config.h"
#include "gen_cpp/cloud.pb.h"
+#include "io/fs/packed_file_trailer.h"
#include "olap/storage_engine.h"
#include "runtime/exec_env.h"
#include "util/coding.h"
@@ -84,20 +85,25 @@ Status append_packed_info_trailer(FileWriter* writer, const
std::string& packed_
packed_file_path);
}
- std::string serialized_info;
- if (!packed_file_info.SerializeToString(&serialized_info)) {
- return Status::InternalError("Failed to serialize packed file info for
{}",
+ cloud::PackedFileDebugInfoPB debug_pb;
+ debug_pb.mutable_packed_file_info()->CopyFrom(packed_file_info);
+
+ std::string serialized_debug_info;
+ if (!debug_pb.SerializeToString(&serialized_debug_info)) {
+ return Status::InternalError("Failed to serialize packed file debug
info for {}",
packed_file_path);
}
- if (serialized_info.size() > std::numeric_limits<uint32_t>::max()) {
- return Status::InternalError("PackedFileInfoPB too large for {}",
packed_file_path);
+ if (serialized_debug_info.size() >
+ std::numeric_limits<uint32_t>::max() - kPackedFileTrailerSuffixSize) {
+ return Status::InternalError("PackedFileDebugInfoPB too large for {}",
packed_file_path);
}
std::string trailer;
- trailer.reserve(serialized_info.size() + sizeof(uint32_t));
- trailer.append(serialized_info);
- put_fixed32_le(&trailer, static_cast<uint32_t>(serialized_info.size()));
+ trailer.reserve(serialized_debug_info.size() +
kPackedFileTrailerSuffixSize);
+ trailer.append(serialized_debug_info);
+ put_fixed32_le(&trailer,
static_cast<uint32_t>(serialized_debug_info.size()));
+ put_fixed32_le(&trailer, kPackedFileTrailerVersion);
return writer->append(Slice(trailer));
}
@@ -428,9 +434,9 @@ Status
PackedFileManager::mark_current_packed_file_for_upload_locked(
sampler->take_sample();
}
}
- LOG(INFO) << "Packed file " << current->packed_file_path
- << " transition ACTIVE->READY_TO_UPLOAD; active_to_ready_ms="
- << active_to_ready_ms;
+ VLOG_DEBUG << "Packed file " << current->packed_file_path
+ << " transition ACTIVE->READY_TO_UPLOAD;
active_to_ready_ms="
+ << active_to_ready_ms;
}
// Move to uploading files list
@@ -538,10 +544,10 @@ void PackedFileManager::process_uploading_packed_files() {
sampler->take_sample();
}
}
- LOG(INFO) << "Packed file " << packed_file->packed_file_path
- << " transition READY_TO_UPLOAD->UPLOADING; "
- "ready_to_upload_ms="
- << duration_ms;
+ VLOG_DEBUG << "Packed file " << packed_file->packed_file_path
+ << " transition READY_TO_UPLOAD->UPLOADING; "
+ "ready_to_upload_ms="
+ << duration_ms;
}
};
@@ -588,10 +594,30 @@ void PackedFileManager::process_uploading_packed_files() {
sampler->take_sample();
}
}
+ int64_t total_ms = -1;
+ if (packed_file->first_append_timestamp.has_value()) {
+ total_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
+ now - *packed_file->first_append_timestamp)
+ .count();
+ }
+ std::ostringstream slices_stream;
+ bool first_slice = true;
+ for (const auto& [small_file_path, index] :
packed_file->slice_locations) {
+ if (!first_slice) {
+ slices_stream << "; ";
+ }
+ first_slice = false;
+ slices_stream << small_file_path << "(txn=" << index.txn_id
+ << ", offset=" << index.offset << ", size=" <<
index.size << ")";
+ }
LOG(INFO) << "Packed file " << packed_file->packed_file_path
- << " upload completed; active_to_ready_ms=" <<
active_to_ready_ms
+ << " uploaded; slices=" <<
packed_file->slice_locations.size()
+ << ", total_bytes=" << packed_file->total_size << ",
slice_detail=["
+ << slices_stream.str() << "]"
+ << ", active_to_ready_ms=" << active_to_ready_ms
<< ", ready_to_upload_ms=" << ready_to_upload_ms
- << ", uploading_to_uploaded_ms=" << uploading_to_uploaded_ms;
+ << ", uploading_to_uploaded_ms=" << uploading_to_uploaded_ms
+ << ", total_ms=" << total_ms;
{
std::lock_guard<std::mutex> upload_lock(packed_file->upload_mutex);
packed_file->state = PackedFileState::UPLOADED;
@@ -684,9 +710,9 @@ void PackedFileManager::process_uploading_packed_files() {
oss << "[" << small_file_path << ", offset=" << index.offset
<< ", size=" << index.size << "]";
}
- LOG(INFO) << oss.str();
+ VLOG_DEBUG << oss.str();
} else {
- LOG(INFO) << "Uploading packed file " << packed_file_path << "
with no small files";
+ VLOG_DEBUG << "Uploading packed file " << packed_file_path << "
with no small files";
}
Status upload_status =
finalize_packed_file_upload(packed_file->packed_file_path,
diff --git a/be/src/io/fs/packed_file_trailer.cpp
b/be/src/io/fs/packed_file_trailer.cpp
new file mode 100644
index 00000000000..99a87b9c3da
--- /dev/null
+++ b/be/src/io/fs/packed_file_trailer.cpp
@@ -0,0 +1,152 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "io/fs/packed_file_trailer.h"
+
+#include <array>
+#include <fstream>
+
+#include "common/status.h"
+#include "util/coding.h"
+
+namespace doris::io {
+
+Status parse_packed_file_trailer(std::string_view data,
cloud::PackedFileDebugInfoPB* debug_pb,
+ uint32_t* version) {
+ if (debug_pb == nullptr || version == nullptr) {
+ return Status::InvalidArgument("Output parameters must not be null");
+ }
+ if (data.size() < kPackedFileTrailerSuffixSize) {
+ return Status::InternalError("Packed file too small to contain
trailer");
+ }
+
+ const size_t suffix_offset = data.size() - kPackedFileTrailerSuffixSize;
+ const auto* suffix_ptr = reinterpret_cast<const uint8_t*>(data.data() +
suffix_offset);
+ const uint32_t trailer_size = decode_fixed32_le(suffix_ptr);
+ const uint32_t trailer_version = decode_fixed32_le(suffix_ptr +
sizeof(uint32_t));
+
+ // Preferred format: [PackedFileDebugInfoPB][length][version]
+ if (trailer_size > 0 && trailer_size <= data.size() -
kPackedFileTrailerSuffixSize) {
+ const size_t payload_offset = data.size() -
kPackedFileTrailerSuffixSize - trailer_size;
+ std::string_view payload(data.data() + payload_offset, trailer_size);
+ if (payload.size() >
static_cast<size_t>(std::numeric_limits<int>::max())) {
+ return Status::InternalError("Packed file trailer payload too
large");
+ }
+ cloud::PackedFileDebugInfoPB parsed_pb;
+ if (parsed_pb.ParseFromArray(payload.data(),
static_cast<int>(payload.size()))) {
+ debug_pb->Swap(&parsed_pb);
+ *version = trailer_version;
+ return Status::OK();
+ }
+ }
+
+ // Legacy format fallback: [PackedFileInfoPB][length]
+ if (data.size() < sizeof(uint32_t)) {
+ return Status::InternalError("Packed file trailer corrupted");
+ }
+ const size_t legacy_suffix_offset = data.size() - sizeof(uint32_t);
+ const auto* legacy_ptr = reinterpret_cast<const uint8_t*>(data.data() +
legacy_suffix_offset);
+ const uint32_t legacy_size = decode_fixed32_le(legacy_ptr);
+ if (legacy_size == 0 || legacy_size > data.size() - sizeof(uint32_t)) {
+ return Status::InternalError("Packed file trailer corrupted");
+ }
+ const size_t legacy_payload_offset = data.size() - sizeof(uint32_t) -
legacy_size;
+ std::string_view legacy_payload(data.data() + legacy_payload_offset,
legacy_size);
+ cloud::PackedFileInfoPB packed_info;
+ if (legacy_payload.size() >
static_cast<size_t>(std::numeric_limits<int>::max())) {
+ return Status::InternalError("Packed file legacy trailer payload too
large");
+ }
+ if (!packed_info.ParseFromArray(legacy_payload.data(),
+ static_cast<int>(legacy_payload.size()))) {
+ return Status::InternalError("Failed to parse packed file trailer");
+ }
+ debug_pb->Clear();
+ debug_pb->mutable_packed_file_info()->Swap(&packed_info);
+ *version = 0;
+ return Status::OK();
+}
+
+Status read_packed_file_trailer(const std::string& file_path,
+ cloud::PackedFileDebugInfoPB* debug_pb,
uint32_t* version) {
+ if (debug_pb == nullptr || version == nullptr) {
+ return Status::InvalidArgument("Output parameters must not be null");
+ }
+
+ std::ifstream file(file_path, std::ios::binary);
+ if (!file.is_open()) {
+ return Status::IOError("Failed to open packed file {}", file_path);
+ }
+
+ file.seekg(0, std::ios::end);
+ const std::streamoff file_size = file.tellg();
+ if (file_size < static_cast<std::streamoff>(sizeof(uint32_t))) {
+ return Status::InternalError("Packed file {} is too small", file_path);
+ }
+
+ auto read_tail = [&](std::streamoff count, std::string* out) -> Status {
+ out->assign(static_cast<size_t>(count), '\0');
+ file.seekg(file_size - count);
+ file.read(out->data(), count);
+ if (!file) {
+ return Status::IOError("Failed to read last {} bytes from {}",
count, file_path);
+ }
+ return Status::OK();
+ };
+
+ // Try new format first.
+ if (file_size >=
static_cast<std::streamoff>(kPackedFileTrailerSuffixSize)) {
+ std::array<char, kPackedFileTrailerSuffixSize> suffix {};
+ file.seekg(file_size - static_cast<std::streamoff>(suffix.size()));
+ file.read(suffix.data(), suffix.size());
+ if (file) {
+ const uint32_t trailer_size =
+
decode_fixed32_le(reinterpret_cast<uint8_t*>(suffix.data()));
+ const uint32_t trailer_version =
+
decode_fixed32_le(reinterpret_cast<uint8_t*>(suffix.data()) + sizeof(uint32_t));
+ const std::streamoff required =
+ static_cast<std::streamoff>(kPackedFileTrailerSuffixSize +
trailer_size);
+ if (trailer_size > 0 && file_size >= required) {
+ std::string tail;
+ RETURN_IF_ERROR(read_tail(required, &tail));
+ Status st = parse_packed_file_trailer(tail, debug_pb, version);
+ if (st.ok() && *version == trailer_version) {
+ return st;
+ }
+ }
+ }
+ file.clear();
+ }
+
+ // Legacy fallback: PackedFileInfoPB + length.
+ std::array<char, sizeof(uint32_t)> legacy_suffix {};
+ file.seekg(file_size - static_cast<std::streamoff>(legacy_suffix.size()));
+ file.read(legacy_suffix.data(), legacy_suffix.size());
+ if (!file) {
+ return Status::IOError("Failed to read legacy trailer length from {}",
file_path);
+ }
+ const uint32_t legacy_size =
+
decode_fixed32_le(reinterpret_cast<uint8_t*>(legacy_suffix.data()));
+ const std::streamoff required =
static_cast<std::streamoff>(sizeof(uint32_t) + legacy_size);
+ if (legacy_size == 0 || file_size < required) {
+ return Status::InternalError("Packed file trailer corrupted for {}",
file_path);
+ }
+ std::string tail;
+ RETURN_IF_ERROR(read_tail(required, &tail));
+ return parse_packed_file_trailer(tail, debug_pb, version);
+}
+
+} // namespace doris::io
diff --git a/be/src/io/fs/packed_file_trailer.h
b/be/src/io/fs/packed_file_trailer.h
new file mode 100644
index 00000000000..21954b5507c
--- /dev/null
+++ b/be/src/io/fs/packed_file_trailer.h
@@ -0,0 +1,38 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include <string>
+#include <string_view>
+
+#include "common/status.h"
+#include "gen_cpp/cloud.pb.h"
+
+namespace doris::io {
+
+constexpr uint32_t kPackedFileTrailerVersion = 1;
+constexpr size_t kPackedFileTrailerSuffixSize = sizeof(uint32_t) * 2;
+
+Status parse_packed_file_trailer(std::string_view data,
cloud::PackedFileDebugInfoPB* debug_pb,
+ uint32_t* version);
+
+Status read_packed_file_trailer(const std::string& file_path,
+ cloud::PackedFileDebugInfoPB* debug_pb,
uint32_t* version);
+
+} // namespace doris::io
diff --git a/be/src/tools/CMakeLists.txt b/be/src/tools/CMakeLists.txt
index 93e18980bb5..ab1fc5fba19 100644
--- a/be/src/tools/CMakeLists.txt
+++ b/be/src/tools/CMakeLists.txt
@@ -53,3 +53,32 @@ add_custom_command(TARGET meta_tool POST_BUILD
COMMAND ${CMAKE_OBJCOPY} --add-gnu-debuglink=$<TARGET_FILE:meta_tool>.dbg
$<TARGET_FILE:meta_tool>
)
endif()
+
+add_executable(packed_file_tool
+ packed_file_tool.cpp
+)
+
+pch_reuse(packed_file_tool)
+
+set_target_properties(packed_file_tool PROPERTIES ENABLE_EXPORTS 1)
+
+if (COMPILER_CLANG)
+ target_compile_options(packed_file_tool PRIVATE
+ -Wno-implicit-int-conversion
+ -Wno-shorten-64-to-32
+ )
+endif()
+
+target_link_libraries(packed_file_tool
+ ${DORIS_LINK_LIBS}
+)
+
+install(TARGETS packed_file_tool DESTINATION ${OUTPUT_DIR}/lib/)
+
+if (NOT OS_MACOSX)
+add_custom_command(TARGET packed_file_tool POST_BUILD
+ COMMAND ${CMAKE_OBJCOPY} --only-keep-debug $<TARGET_FILE:packed_file_tool>
$<TARGET_FILE:packed_file_tool>.dbg
+ COMMAND ${CMAKE_STRIP} --strip-debug --strip-unneeded
$<TARGET_FILE:packed_file_tool>
+ COMMAND ${CMAKE_OBJCOPY}
--add-gnu-debuglink=$<TARGET_FILE:packed_file_tool>.dbg
$<TARGET_FILE:packed_file_tool>
+ )
+endif()
diff --git a/be/src/tools/packed_file_tool.cpp
b/be/src/tools/packed_file_tool.cpp
new file mode 100644
index 00000000000..0d77d94b9e3
--- /dev/null
+++ b/be/src/tools/packed_file_tool.cpp
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gen_cpp/cloud.pb.h>
+#include <gflags/gflags.h>
+
+#include <iostream>
+#include <string>
+
+#include "common/status.h"
+#include "io/fs/packed_file_trailer.h"
+#include "json2pb/pb_to_json.h"
+
+DEFINE_string(file, "", "Path to a local merge/packed file");
+
+int main(int argc, char** argv) {
+ google::SetUsageMessage(
+ "Dump packed file trailer for debugging.\n"
+ "Usage: packed_file_tool --file=/path/to/merge_file");
+ google::ParseCommandLineFlags(&argc, &argv, true);
+
+ if (FLAGS_file.empty()) {
+ std::cerr << "Flag --file is required\n";
+ return -1;
+ }
+
+ doris::cloud::PackedFileDebugInfoPB debug_info;
+ uint32_t version = 0;
+ doris::Status st = doris::io::read_packed_file_trailer(FLAGS_file,
&debug_info, &version);
+ if (!st.ok()) {
+ std::cerr << "Failed to read packed file trailer: " << st.to_string()
<< std::endl;
+ return -1;
+ }
+
+ json2pb::Pb2JsonOptions options;
+ options.pretty_json = true;
+ std::string json;
+ json2pb::ProtoMessageToJson(debug_info, &json, options);
+
+ std::cout << "trailer_version: " << version << "\n" << json << std::endl;
+ return 0;
+}
diff --git a/be/test/io/fs/packed_file_manager_test.cpp
b/be/test/io/fs/packed_file_manager_test.cpp
index 38b708ae36d..a99ee158ad0 100644
--- a/be/test/io/fs/packed_file_manager_test.cpp
+++ b/be/test/io/fs/packed_file_manager_test.cpp
@@ -38,6 +38,7 @@
#include "cpp/sync_point.h"
#include "io/fs/file_system.h"
#include "io/fs/file_writer.h"
+#include "io/fs/packed_file_trailer.h"
#include "io/fs/path.h"
#include "util/coding.h"
#include "util/slice.h"
@@ -607,15 +608,14 @@ TEST_F(PackedFileManagerTest,
AppendPackedFileInfoToFileTail) {
ASSERT_NE(writer, nullptr);
const auto& data = writer->written_data();
- ASSERT_GE(data.size(), payload.size() + sizeof(uint32_t));
- uint32_t trailer_size = decode_fixed32_le(
- reinterpret_cast<const uint8_t*>(data.data() + data.size() -
sizeof(uint32_t)));
- ASSERT_GE(data.size(), payload.size() + sizeof(uint32_t) + trailer_size);
-
- std::string serialized_info =
- data.substr(data.size() - sizeof(uint32_t) - trailer_size,
trailer_size);
- cloud::PackedFileInfoPB parsed_info;
- ASSERT_TRUE(parsed_info.ParseFromString(serialized_info));
+ cloud::PackedFileDebugInfoPB parsed_debug;
+ uint32_t version = 0;
+ auto st = parse_packed_file_trailer(data, &parsed_debug, &version);
+ ASSERT_TRUE(st.ok()) << st;
+ ASSERT_EQ(version, kPackedFileTrailerVersion);
+ ASSERT_TRUE(parsed_debug.has_packed_file_info());
+
+ const auto& parsed_info = parsed_debug.packed_file_info();
ASSERT_EQ(parsed_info.slices_size(), 1);
EXPECT_EQ(parsed_info.slices(0).path(), "trailer_path");
EXPECT_EQ(parsed_info.slices(0).offset(), 0);
diff --git a/be/test/io/packed_file_trailer_test.cpp
b/be/test/io/packed_file_trailer_test.cpp
new file mode 100644
index 00000000000..0028a237095
--- /dev/null
+++ b/be/test/io/packed_file_trailer_test.cpp
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "io/fs/packed_file_trailer.h"
+
+#include <gtest/gtest.h>
+#include <unistd.h>
+
+#include <filesystem>
+#include <fstream>
+#include <string>
+
+#include "common/status.h"
+#include "util/coding.h"
+
+namespace doris::io {
+
+using doris::Status;
+
+namespace {
+std::string unique_temp_file() {
+ auto dir = std::filesystem::temp_directory_path();
+ std::string pattern = (dir / "packed_file_XXXXXX").string();
+ int fd = mkstemp(pattern.data());
+ if (fd != -1) {
+ close(fd);
+ }
+ return pattern;
+}
+
+void write_file(const std::string& path, const std::string& data) {
+ std::ofstream ofs(path, std::ios::binary | std::ios::trunc);
+ ofs.write(data.data(), data.size());
+}
+} // namespace
+
+TEST(PackedFileTrailerTest, ReadNewFormatTrailer) {
+ cloud::PackedFileInfoPB info;
+ info.set_resource_id("resource");
+ info.set_total_slice_num(1);
+ auto* slice = info.add_slices();
+ slice->set_path("a");
+ slice->set_offset(10);
+ slice->set_size(20);
+
+ cloud::PackedFileDebugInfoPB debug_pb;
+ debug_pb.mutable_packed_file_info()->CopyFrom(info);
+
+ std::string serialized_debug;
+ ASSERT_TRUE(debug_pb.SerializeToString(&serialized_debug));
+
+ std::string file_content = "data";
+ file_content.append(serialized_debug);
+ put_fixed32_le(&file_content,
static_cast<uint32_t>(serialized_debug.size()));
+ put_fixed32_le(&file_content, kPackedFileTrailerVersion);
+
+ auto path = unique_temp_file();
+ write_file(path, file_content);
+
+ cloud::PackedFileDebugInfoPB parsed;
+ uint32_t version = 0;
+ Status st = read_packed_file_trailer(path, &parsed, &version);
+ ASSERT_TRUE(st.ok()) << st;
+ EXPECT_EQ(version, kPackedFileTrailerVersion);
+ ASSERT_TRUE(parsed.has_packed_file_info());
+ EXPECT_EQ(parsed.packed_file_info().resource_id(), info.resource_id());
+ ASSERT_EQ(parsed.packed_file_info().slices_size(), 1);
+ EXPECT_EQ(parsed.packed_file_info().slices(0).path(), "a");
+ EXPECT_EQ(parsed.packed_file_info().slices(0).offset(), 10);
+ EXPECT_EQ(parsed.packed_file_info().slices(0).size(), 20);
+
+ std::filesystem::remove(path);
+}
+
+TEST(PackedFileTrailerTest, ReadLegacyTrailer) {
+ cloud::PackedFileInfoPB info;
+ info.set_total_slice_num(2);
+ info.set_remaining_slice_bytes(100);
+
+ std::string serialized_info;
+ ASSERT_TRUE(info.SerializeToString(&serialized_info));
+
+ std::string file_content = "legacy_payload";
+ file_content.append(serialized_info);
+ put_fixed32_le(&file_content,
static_cast<uint32_t>(serialized_info.size()));
+
+ auto path = unique_temp_file();
+ write_file(path, file_content);
+
+ cloud::PackedFileDebugInfoPB parsed;
+ uint32_t version = 0;
+ Status st = read_packed_file_trailer(path, &parsed, &version);
+ ASSERT_TRUE(st.ok()) << st;
+ EXPECT_EQ(version, 0);
+ ASSERT_TRUE(parsed.has_packed_file_info());
+ EXPECT_EQ(parsed.packed_file_info().total_slice_num(),
info.total_slice_num());
+ EXPECT_EQ(parsed.packed_file_info().remaining_slice_bytes(),
info.remaining_slice_bytes());
+
+ std::filesystem::remove(path);
+}
+
+} // namespace doris::io
diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h
index f085ce04244..28c121eec9b 100644
--- a/cloud/src/common/config.h
+++ b/cloud/src/common/config.h
@@ -115,6 +115,11 @@ CONF_mInt64(check_recycle_task_interval_seconds, "600");
// 10min
CONF_mInt64(recycler_sleep_before_scheduling_seconds, "60");
// log a warning if a recycle task takes longer than this duration
CONF_mInt64(recycle_task_threshold_seconds, "10800"); // 3h
+CONF_mInt32(decrement_packed_file_ref_counts_retry_times, "10");
+CONF_mInt32(packed_file_txn_retry_times, "10");
+// randomized interval to reduce conflict storms in FoundationDB, default
5-50ms
+CONF_mInt64(packed_file_txn_retry_sleep_min_ms, "5");
+CONF_mInt64(packed_file_txn_retry_sleep_max_ms, "50");
// force recycler to recycle all useless object.
// **just for TEST**
diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp
index eabe17dce34..c9c6065722f 100644
--- a/cloud/src/recycler/recycler.cpp
+++ b/cloud/src/recycler/recycler.cpp
@@ -36,8 +36,10 @@
#include <initializer_list>
#include <memory>
#include <numeric>
+#include <random>
#include <string>
#include <string_view>
+#include <thread>
#include <unordered_map>
#include <utility>
#include <variant>
@@ -78,6 +80,22 @@ namespace doris::cloud {
using namespace std::chrono;
+namespace {
+
+int64_t packed_file_retry_sleep_ms() {
+ const int64_t min_ms = std::max<int64_t>(0,
config::packed_file_txn_retry_sleep_min_ms);
+ const int64_t max_ms = std::max<int64_t>(min_ms,
config::packed_file_txn_retry_sleep_max_ms);
+ thread_local std::mt19937_64 gen(std::random_device {}());
+ std::uniform_int_distribution<int64_t> dist(min_ms, max_ms);
+ return dist(gen);
+}
+
+void sleep_for_packed_file_retry() {
+
std::this_thread::sleep_for(std::chrono::milliseconds(packed_file_retry_sleep_ms()));
+}
+
+} // namespace
+
// return 0 for success get a key, 1 for key not found, negative for error
[[maybe_unused]] static int txn_get(TxnKv* txn_kv, std::string_view key,
std::string& val) {
std::unique_ptr<Transaction> txn;
@@ -1190,132 +1208,160 @@ int
InstanceRecycler::correct_packed_file_info(cloud::PackedFileInfoPB* packed_i
int InstanceRecycler::process_single_packed_file(const std::string& packed_key,
const std::string&
packed_file_path,
PackedFileRecycleStats*
stats) {
- if (stopped()) {
- LOG_WARNING("recycler stopped before processing packed file")
- .tag("instance_id", instance_id_)
- .tag("packed_file_path", packed_file_path);
- return -1;
- }
-
- std::unique_ptr<Transaction> txn;
- TxnErrorCode err = txn_kv_->create_txn(&txn);
- if (err != TxnErrorCode::TXN_OK) {
- LOG_WARNING("failed to create txn when processing packed file")
- .tag("instance_id", instance_id_)
- .tag("packed_file_path", packed_file_path)
- .tag("err", err);
- return -1;
- }
+ const int max_retry_times = std::max(1,
config::packed_file_txn_retry_times);
+ bool correction_ok = false;
+ cloud::PackedFileInfoPB packed_info;
- std::string packed_val;
- err = txn->get(packed_key, &packed_val);
- if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
- return 0;
- }
- if (err != TxnErrorCode::TXN_OK) {
- LOG_WARNING("failed to get packed file kv")
- .tag("instance_id", instance_id_)
- .tag("packed_file_path", packed_file_path)
- .tag("err", err);
- return -1;
- }
+ for (int attempt = 1; attempt <= max_retry_times; ++attempt) {
+ if (stopped()) {
+ LOG_WARNING("recycler stopped before processing packed file")
+ .tag("instance_id", instance_id_)
+ .tag("packed_file_path", packed_file_path)
+ .tag("attempt", attempt);
+ return -1;
+ }
- cloud::PackedFileInfoPB packed_info;
- if (!packed_info.ParseFromString(packed_val)) {
- LOG_WARNING("failed to parse packed file info")
- .tag("instance_id", instance_id_)
- .tag("packed_file_path", packed_file_path);
- return -1;
- }
+ std::unique_ptr<Transaction> txn;
+ TxnErrorCode err = txn_kv_->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ LOG_WARNING("failed to create txn when processing packed file")
+ .tag("instance_id", instance_id_)
+ .tag("packed_file_path", packed_file_path)
+ .tag("attempt", attempt)
+ .tag("err", err);
+ return -1;
+ }
- int64_t now_sec = ::time(nullptr);
- bool corrected = packed_info.corrected();
- bool due =
- config::force_immediate_recycle ||
- now_sec - packed_info.created_at_sec() >=
config::packed_file_correction_delay_seconds;
+ std::string packed_val;
+ err = txn->get(packed_key, &packed_val);
+ if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
+ return 0;
+ }
+ if (err != TxnErrorCode::TXN_OK) {
+ LOG_WARNING("failed to get packed file kv")
+ .tag("instance_id", instance_id_)
+ .tag("packed_file_path", packed_file_path)
+ .tag("attempt", attempt)
+ .tag("err", err);
+ return -1;
+ }
- if (!corrected && due) {
- bool changed = false;
- if (correct_packed_file_info(&packed_info, &changed, packed_file_path,
stats) != 0) {
- LOG_WARNING("correct_packed_file_info failed")
+ if (!packed_info.ParseFromString(packed_val)) {
+ LOG_WARNING("failed to parse packed file info")
.tag("instance_id", instance_id_)
- .tag("packed_file_path", packed_file_path);
+ .tag("packed_file_path", packed_file_path)
+ .tag("attempt", attempt);
return -1;
}
- if (changed) {
- std::string updated;
- if (!packed_info.SerializeToString(&updated)) {
- LOG_WARNING("failed to serialize packed file info after
correction")
+
+ int64_t now_sec = ::time(nullptr);
+ bool corrected = packed_info.corrected();
+ bool due = config::force_immediate_recycle ||
+ now_sec - packed_info.created_at_sec() >=
+ config::packed_file_correction_delay_seconds;
+
+ if (!corrected && due) {
+ bool changed = false;
+ if (correct_packed_file_info(&packed_info, &changed,
packed_file_path, stats) != 0) {
+ LOG_WARNING("correct_packed_file_info failed")
.tag("instance_id", instance_id_)
- .tag("packed_file_path", packed_file_path);
+ .tag("packed_file_path", packed_file_path)
+ .tag("attempt", attempt);
return -1;
}
- txn->put(packed_key, updated);
- err = txn->commit();
- if (err == TxnErrorCode::TXN_OK) {
- if (stats) {
- ++stats->num_corrected;
- }
- } else {
- if (err == TxnErrorCode::TXN_CONFLICT) {
- LOG_WARNING("failed to commit correction for packed file
due to conflict")
+ if (changed) {
+ std::string updated;
+ if (!packed_info.SerializeToString(&updated)) {
+ LOG_WARNING("failed to serialize packed file info after
correction")
.tag("instance_id", instance_id_)
- .tag("packed_file_path", packed_file_path);
+ .tag("packed_file_path", packed_file_path)
+ .tag("attempt", attempt);
+ return -1;
+ }
+ txn->put(packed_key, updated);
+ err = txn->commit();
+ if (err == TxnErrorCode::TXN_OK) {
+ if (stats) {
+ ++stats->num_corrected;
+ }
} else {
+ if (err == TxnErrorCode::TXN_CONFLICT && attempt <
max_retry_times) {
+ LOG_WARNING(
+ "failed to commit correction for packed file
due to conflict, "
+ "retrying")
+ .tag("instance_id", instance_id_)
+ .tag("packed_file_path", packed_file_path)
+ .tag("attempt", attempt);
+ sleep_for_packed_file_retry();
+ packed_info.Clear();
+ continue;
+ }
LOG_WARNING("failed to commit correction for packed file")
.tag("instance_id", instance_id_)
.tag("packed_file_path", packed_file_path)
+ .tag("attempt", attempt)
.tag("err", err);
+ return -1;
}
- return -1;
}
}
+
+ correction_ok = true;
+ break;
}
- txn.reset();
+ if (!correction_ok) {
+ return -1;
+ }
- if (packed_info.state() == cloud::PackedFileInfoPB::RECYCLING &&
packed_info.ref_cnt() == 0) {
- if (!packed_info.has_resource_id() ||
packed_info.resource_id().empty()) {
- LOG_WARNING("packed file missing resource id when recycling")
- .tag("instance_id", instance_id_)
- .tag("packed_file_path", packed_file_path);
- return -1;
- }
- auto [resource_id, accessor] =
resolve_packed_file_accessor(packed_info.resource_id());
- if (!accessor) {
- LOG_WARNING("no accessor available to delete packed file")
- .tag("instance_id", instance_id_)
- .tag("packed_file_path", packed_file_path)
- .tag("resource_id", packed_info.resource_id());
- return -1;
- }
- int del_ret = accessor->delete_file(packed_file_path);
- if (del_ret != 0 && del_ret != 1) {
- LOG_WARNING("failed to delete packed file")
- .tag("instance_id", instance_id_)
- .tag("packed_file_path", packed_file_path)
- .tag("resource_id", resource_id)
- .tag("ret", del_ret);
- return -1;
- }
- if (del_ret == 1) {
- LOG_INFO("packed file already removed")
- .tag("instance_id", instance_id_)
- .tag("packed_file_path", packed_file_path)
- .tag("resource_id", resource_id);
- } else {
- LOG_INFO("deleted packed file")
- .tag("instance_id", instance_id_)
- .tag("packed_file_path", packed_file_path)
- .tag("resource_id", resource_id);
- }
+ if (!(packed_info.state() == cloud::PackedFileInfoPB::RECYCLING &&
+ packed_info.ref_cnt() == 0)) {
+ return 0;
+ }
+ if (!packed_info.has_resource_id() || packed_info.resource_id().empty()) {
+ LOG_WARNING("packed file missing resource id when recycling")
+ .tag("instance_id", instance_id_)
+ .tag("packed_file_path", packed_file_path);
+ return -1;
+ }
+ auto [resource_id, accessor] =
resolve_packed_file_accessor(packed_info.resource_id());
+ if (!accessor) {
+ LOG_WARNING("no accessor available to delete packed file")
+ .tag("instance_id", instance_id_)
+ .tag("packed_file_path", packed_file_path)
+ .tag("resource_id", packed_info.resource_id());
+ return -1;
+ }
+ int del_ret = accessor->delete_file(packed_file_path);
+ if (del_ret != 0 && del_ret != 1) {
+ LOG_WARNING("failed to delete packed file")
+ .tag("instance_id", instance_id_)
+ .tag("packed_file_path", packed_file_path)
+ .tag("resource_id", resource_id)
+ .tag("ret", del_ret);
+ return -1;
+ }
+ if (del_ret == 1) {
+ LOG_INFO("packed file already removed")
+ .tag("instance_id", instance_id_)
+ .tag("packed_file_path", packed_file_path)
+ .tag("resource_id", resource_id);
+ } else {
+ LOG_INFO("deleted packed file")
+ .tag("instance_id", instance_id_)
+ .tag("packed_file_path", packed_file_path)
+ .tag("resource_id", resource_id);
+ }
+
+ for (int del_attempt = 1; del_attempt <= max_retry_times; ++del_attempt) {
std::unique_ptr<Transaction> del_txn;
- err = txn_kv_->create_txn(&del_txn);
+ TxnErrorCode err = txn_kv_->create_txn(&del_txn);
if (err != TxnErrorCode::TXN_OK) {
LOG_WARNING("failed to create txn when removing packed file kv")
.tag("instance_id", instance_id_)
.tag("packed_file_path", packed_file_path)
+ .tag("del_attempt", del_attempt)
.tag("err", err);
return -1;
}
@@ -1329,6 +1375,7 @@ int InstanceRecycler::process_single_packed_file(const
std::string& packed_key,
LOG_WARNING("failed to re-read packed file kv before removal")
.tag("instance_id", instance_id_)
.tag("packed_file_path", packed_file_path)
+ .tag("del_attempt", del_attempt)
.tag("err", err);
return -1;
}
@@ -1337,7 +1384,8 @@ int InstanceRecycler::process_single_packed_file(const
std::string& packed_key,
if (!latest_info.ParseFromString(latest_val)) {
LOG_WARNING("failed to parse packed file info before removal")
.tag("instance_id", instance_id_)
- .tag("packed_file_path", packed_file_path);
+ .tag("packed_file_path", packed_file_path)
+ .tag("del_attempt", del_attempt);
return -1;
}
@@ -1345,7 +1393,8 @@ int InstanceRecycler::process_single_packed_file(const
std::string& packed_key,
latest_info.ref_cnt() == 0)) {
LOG_INFO("packed file state changed before removal, skip deleting
kv")
.tag("instance_id", instance_id_)
- .tag("packed_file_path", packed_file_path);
+ .tag("packed_file_path", packed_file_path)
+ .tag("del_attempt", del_attempt);
return 0;
}
@@ -1371,19 +1420,29 @@ int InstanceRecycler::process_single_packed_file(const
std::string& packed_key,
return 0;
}
if (err == TxnErrorCode::TXN_CONFLICT) {
- LOG_WARNING("failed to remove packed file kv due to conflict")
+ if (del_attempt >= max_retry_times) {
+ LOG_WARNING("failed to remove packed file kv due to conflict
after max retry")
+ .tag("instance_id", instance_id_)
+ .tag("packed_file_path", packed_file_path)
+ .tag("del_attempt", del_attempt);
+ return -1;
+ }
+ LOG_WARNING("failed to remove packed file kv due to conflict,
retrying")
.tag("instance_id", instance_id_)
- .tag("packed_file_path", packed_file_path);
- return -1;
+ .tag("packed_file_path", packed_file_path)
+ .tag("del_attempt", del_attempt);
+ sleep_for_packed_file_retry();
+ continue;
}
LOG_WARNING("failed to remove packed file kv")
.tag("instance_id", instance_id_)
.tag("packed_file_path", packed_file_path)
+ .tag("del_attempt", del_attempt)
.tag("err", err);
return -1;
}
- return 0;
+ return -1;
}
int InstanceRecycler::handle_packed_file_kv(std::string_view key,
std::string_view /*value*/,
@@ -2603,6 +2662,7 @@ int
InstanceRecycler::decrement_packed_file_ref_counts(const doris::RowsetMetaCl
return 0;
}
+ const int max_retry_times = std::max(1,
config::decrement_packed_file_ref_counts_retry_times);
int ret = 0;
for (auto& [packed_file_path, small_files] : packed_file_updates) {
if (small_files.empty()) {
@@ -2610,7 +2670,7 @@ int
InstanceRecycler::decrement_packed_file_ref_counts(const doris::RowsetMetaCl
}
bool success = false;
- do {
+ for (int attempt = 1; attempt <= max_retry_times; ++attempt) {
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
@@ -2763,14 +2823,26 @@ int
InstanceRecycler::decrement_packed_file_ref_counts(const doris::RowsetMetaCl
break;
}
if (err == TxnErrorCode::TXN_CONFLICT) {
- LOG_WARNING("packed file info update conflict, not retrying")
+ if (attempt >= max_retry_times) {
+ LOG_WARNING("packed file info update conflict after max
retry")
+ .tag("instance_id", instance_id_)
+ .tag("packed_file_path", packed_file_path)
+ .tag("rowset_id", rs_meta_pb.rowset_id_v2())
+ .tag("tablet_id", rs_meta_pb.tablet_id())
+ .tag("changed_files", changed_files)
+ .tag("attempt", attempt);
+ ret = -1;
+ break;
+ }
+ LOG_WARNING("packed file info update conflict, retrying")
.tag("instance_id", instance_id_)
.tag("packed_file_path", packed_file_path)
.tag("rowset_id", rs_meta_pb.rowset_id_v2())
.tag("tablet_id", rs_meta_pb.tablet_id())
- .tag("changed_files", changed_files);
- ret = -1;
- break;
+ .tag("changed_files", changed_files)
+ .tag("attempt", attempt);
+ sleep_for_packed_file_retry();
+ continue;
}
LOG_WARNING("failed to commit packed file info update")
@@ -2782,7 +2854,7 @@ int
InstanceRecycler::decrement_packed_file_ref_counts(const doris::RowsetMetaCl
.tag("changed_files", changed_files);
ret = -1;
break;
- } while (false);
+ }
if (!success) {
ret = -1;
@@ -2832,63 +2904,81 @@ int InstanceRecycler::delete_packed_file_and_kv(const
std::string& packed_file_p
.tag("resource_id", resource_id);
}
- std::unique_ptr<Transaction> del_txn;
- TxnErrorCode err = txn_kv_->create_txn(&del_txn);
- if (err != TxnErrorCode::TXN_OK) {
- LOG_WARNING("failed to create txn when removing packed file kv")
- .tag("instance_id", instance_id_)
- .tag("packed_file_path", packed_file_path)
- .tag("err", err);
- return -1;
- }
+ const int max_retry_times = std::max(1,
config::packed_file_txn_retry_times);
+ for (int attempt = 1; attempt <= max_retry_times; ++attempt) {
+ std::unique_ptr<Transaction> del_txn;
+ TxnErrorCode err = txn_kv_->create_txn(&del_txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ LOG_WARNING("failed to create txn when removing packed file kv")
+ .tag("instance_id", instance_id_)
+ .tag("packed_file_path", packed_file_path)
+ .tag("attempt", attempt)
+ .tag("err", err);
+ return -1;
+ }
- std::string latest_val;
- err = del_txn->get(packed_key, &latest_val);
- if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
- return 0;
- }
- if (err != TxnErrorCode::TXN_OK) {
- LOG_WARNING("failed to re-read packed file kv before removal")
- .tag("instance_id", instance_id_)
- .tag("packed_file_path", packed_file_path)
- .tag("err", err);
- return -1;
- }
+ std::string latest_val;
+ err = del_txn->get(packed_key, &latest_val);
+ if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
+ return 0;
+ }
+ if (err != TxnErrorCode::TXN_OK) {
+ LOG_WARNING("failed to re-read packed file kv before removal")
+ .tag("instance_id", instance_id_)
+ .tag("packed_file_path", packed_file_path)
+ .tag("attempt", attempt)
+ .tag("err", err);
+ return -1;
+ }
- cloud::PackedFileInfoPB latest_info;
- if (!latest_info.ParseFromString(latest_val)) {
- LOG_WARNING("failed to parse packed file info before removal")
- .tag("instance_id", instance_id_)
- .tag("packed_file_path", packed_file_path);
- return -1;
- }
+ cloud::PackedFileInfoPB latest_info;
+ if (!latest_info.ParseFromString(latest_val)) {
+ LOG_WARNING("failed to parse packed file info before removal")
+ .tag("instance_id", instance_id_)
+ .tag("packed_file_path", packed_file_path)
+ .tag("attempt", attempt);
+ return -1;
+ }
- if (!(latest_info.state() == cloud::PackedFileInfoPB::RECYCLING &&
- latest_info.ref_cnt() == 0)) {
- LOG_INFO("packed file state changed before removal, skip deleting kv")
- .tag("instance_id", instance_id_)
- .tag("packed_file_path", packed_file_path);
- return 0;
- }
+ if (!(latest_info.state() == cloud::PackedFileInfoPB::RECYCLING &&
+ latest_info.ref_cnt() == 0)) {
+ LOG_INFO("packed file state changed before removal, skip deleting
kv")
+ .tag("instance_id", instance_id_)
+ .tag("packed_file_path", packed_file_path)
+ .tag("attempt", attempt);
+ return 0;
+ }
- del_txn->remove(packed_key);
- err = del_txn->commit();
- if (err == TxnErrorCode::TXN_OK) {
- LOG_INFO("removed packed file metadata")
- .tag("instance_id", instance_id_)
- .tag("packed_file_path", packed_file_path);
- return 0;
- }
- if (err == TxnErrorCode::TXN_CONFLICT) {
- LOG_WARNING("failed to remove packed file kv due to conflict")
+ del_txn->remove(packed_key);
+ err = del_txn->commit();
+ if (err == TxnErrorCode::TXN_OK) {
+ LOG_INFO("removed packed file metadata")
+ .tag("instance_id", instance_id_)
+ .tag("packed_file_path", packed_file_path);
+ return 0;
+ }
+ if (err == TxnErrorCode::TXN_CONFLICT) {
+ if (attempt >= max_retry_times) {
+ LOG_WARNING("failed to remove packed file kv due to conflict
after max retry")
+ .tag("instance_id", instance_id_)
+ .tag("packed_file_path", packed_file_path)
+ .tag("attempt", attempt);
+ return -1;
+ }
+ LOG_WARNING("failed to remove packed file kv due to conflict,
retrying")
+ .tag("instance_id", instance_id_)
+ .tag("packed_file_path", packed_file_path)
+ .tag("attempt", attempt);
+ sleep_for_packed_file_retry();
+ continue;
+ }
+ LOG_WARNING("failed to remove packed file kv")
.tag("instance_id", instance_id_)
- .tag("packed_file_path", packed_file_path);
+ .tag("packed_file_path", packed_file_path)
+ .tag("attempt", attempt)
+ .tag("err", err);
return -1;
}
- LOG_WARNING("failed to remove packed file kv")
- .tag("instance_id", instance_id_)
- .tag("packed_file_path", packed_file_path)
- .tag("err", err);
return -1;
}
@@ -2907,8 +2997,11 @@ int InstanceRecycler::delete_rowset_data(
// due to aborted schema change.
if (is_formal_rowset) {
std::lock_guard lock(recycled_tablets_mtx_);
- if (recycled_tablets_.count(rs.tablet_id())) {
- continue; // Rowset data has already been deleted
+ if (recycled_tablets_.count(rs.tablet_id()) &&
rs.packed_slice_locations_size() == 0) {
+ // Tablet has been recycled and this rowset has no packed
slices, so file data
+ // should already be gone; skip to avoid redundant deletes.
Rowsets with packed
+ // slice info must still run to decrement packed file ref
counts.
+ continue;
}
}
diff --git a/cloud/src/recycler/recycler.h b/cloud/src/recycler/recycler.h
index a9f2b2c662d..5cf87bcffe0 100644
--- a/cloud/src/recycler/recycler.h
+++ b/cloud/src/recycler/recycler.h
@@ -434,6 +434,7 @@ private:
// return 0 for success otherwise error
int decrement_packed_file_ref_counts(const doris::RowsetMetaCloudPB&
rs_meta_pb);
+ friend class
RecyclerTest_delete_rowset_data_packed_file_respects_recycled_tablet_Test;
int delete_packed_file_and_kv(const std::string& packed_file_path,
const std::string& packed_key,
diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp
index abb1d706998..a308a1cfef8 100644
--- a/cloud/test/recycler_test.cpp
+++ b/cloud/test/recycler_test.cpp
@@ -5555,6 +5555,93 @@ TEST(RecyclerTest,
delete_rowset_data_packed_file_single_rowset) {
EXPECT_EQ(1, accessor->exists(packed_file_path));
}
+TEST(RecyclerTest, delete_rowset_data_packed_file_respects_recycled_tablet) {
+ auto txn_kv = std::make_shared<MemTxnKv>();
+ ASSERT_EQ(txn_kv->init(), 0);
+
+ constexpr std::string_view kResourceId =
"packed_file_resource_recycled_tablet";
+ InstanceInfoPB instance;
+ instance.set_instance_id(instance_id);
+ auto obj_info = instance.add_obj_info();
+ obj_info->set_id(std::string(kResourceId));
+ obj_info->set_ak(config::test_s3_ak);
+ obj_info->set_sk(config::test_s3_sk);
+ obj_info->set_endpoint(config::test_s3_endpoint);
+ obj_info->set_region(config::test_s3_region);
+ obj_info->set_bucket(config::test_s3_bucket);
+ obj_info->set_prefix(std::string(kResourceId));
+
+ InstanceRecycler recycler(txn_kv, instance, thread_group,
+ std::make_shared<TxnLazyCommitter>(txn_kv));
+ ASSERT_EQ(recycler.init(), 0);
+ auto accessor = recycler.accessor_map_.begin()->second;
+
+ doris::TabletSchemaCloudPB schema;
+ schema.set_schema_version(2);
+ schema.set_inverted_index_storage_format(InvertedIndexStorageFormatPB::V1);
+
+ auto rowset = create_rowset(std::string(kResourceId), 123456, 2001, 2,
schema, 7201);
+ ASSERT_EQ(0, create_tmp_rowset(txn_kv.get(), accessor.get(), rowset,
true));
+
+ auto merged_map = rowset.mutable_packed_slice_locations();
+ std::string packed_file_path =
+ fmt::format("merge/{}/{}_recycled.dat", rowset.tablet_id(),
rowset.rowset_id_v2());
+ constexpr int64_t kSliceBytes = 256;
+ for (int i = 0; i < rowset.num_segments(); ++i) {
+ std::string small_path = segment_path(rowset.tablet_id(),
rowset.rowset_id_v2(), i);
+ auto& index_pb = (*merged_map)[small_path];
+ index_pb.set_packed_file_path(packed_file_path);
+ index_pb.set_offset(i * kSliceBytes);
+ index_pb.set_size(kSliceBytes);
+ }
+
+ accessor->put_file(packed_file_path, "");
+ PackedFileInfoPB merged_info;
+ merged_info.set_ref_cnt(rowset.num_segments());
+ merged_info.set_total_slice_num(rowset.num_segments());
+ merged_info.set_total_slice_bytes(kSliceBytes * rowset.num_segments());
+ merged_info.set_remaining_slice_bytes(kSliceBytes * rowset.num_segments());
+ merged_info.set_state(PackedFileInfoPB::NORMAL);
+ merged_info.set_resource_id(std::string(kResourceId));
+ for (const auto& [small_path, index_pb] : *merged_map) {
+ auto* small_file = merged_info.add_slices();
+ small_file->set_path(small_path);
+ small_file->set_offset(index_pb.offset());
+ small_file->set_size(index_pb.size());
+ small_file->set_deleted(false);
+ small_file->set_txn_id(next_small_file_txn_id());
+ small_file->set_rowset_id(rowset.rowset_id_v2());
+ small_file->set_tablet_id(rowset.tablet_id());
+ }
+
+ std::string merged_key = packed_file_key({instance_id, packed_file_path});
+ {
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(TxnErrorCode::TXN_OK, txn_kv->create_txn(&txn));
+ std::string merged_val;
+ ASSERT_TRUE(merged_info.SerializeToString(&merged_val));
+ txn->put(merged_key, merged_val);
+ ASSERT_EQ(TxnErrorCode::TXN_OK, txn->commit());
+ }
+
+ // Simulate tablet already recycled; rowset with packed slices should
still be processed.
+ {
+ std::lock_guard lock(recycler.recycled_tablets_mtx_);
+ recycler.recycled_tablets_.insert(rowset.tablet_id());
+ }
+
+ std::map<std::string, doris::RowsetMetaCloudPB> rowsets;
+ rowsets.emplace(rowset.rowset_id_v2(), rowset);
+ RecyclerMetricsContext metrics_ctx(instance_id, "delete_rowset_data");
+ ASSERT_EQ(0, recycler.delete_rowset_data(rowsets,
RowsetRecyclingState::FORMAL_ROWSET,
+ metrics_ctx));
+
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(TxnErrorCode::TXN_OK, txn_kv->create_txn(&txn));
+ std::string updated_val;
+ EXPECT_EQ(TxnErrorCode::TXN_KEY_NOT_FOUND, txn->get(merged_key,
&updated_val));
+}
+
TEST(RecyclerTest, delete_rowset_data_packed_file_batch_rowsets) {
auto txn_kv = std::make_shared<MemTxnKv>();
ASSERT_EQ(txn_kv->init(), 0);
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index 30e3a5f4d46..8f502cf439f 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -2157,6 +2157,12 @@ message PackedFileInfoPB {
optional string resource_id = 9;
}
+// Wrapper for packed file debug information. It keeps PackedFileInfoPB
extensible for tools
+// reading packed file trailers.
+message PackedFileDebugInfoPB {
+ optional PackedFileInfoPB packed_file_info = 1;
+}
+
message UpdatePackedFileInfoRequest {
optional string cloud_unique_id = 1; // For auth
optional string request_ip = 2;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]