This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new fe0df602c22 [fix](packed-file) fix issues in packed file recycler
checker (#59153)
fe0df602c22 is described below
commit fe0df602c22b22b6ade9639bef60f2f51ffc67a0
Author: Luwei <[email protected]>
AuthorDate: Fri Dec 19 02:52:05 2025 +0800
[fix](packed-file) fix issues in packed file recycler checker (#59153)
---
be/src/cloud/config.cpp | 3 +-
be/src/cloud/config.h | 1 -
be/src/io/fs/packed_file_manager.cpp | 20 +++---
be/src/io/fs/packed_file_trailer.cpp | 10 +--
be/src/io/fs/packed_file_trailer.h | 6 +-
be/src/tools/packed_file_tool.cpp | 2 +-
be/test/io/fs/packed_file_manager_test.cpp | 8 +--
be/test/io/packed_file_trailer_test.cpp | 16 ++---
cloud/src/common/config.h | 1 +
cloud/src/recycler/checker.cpp | 83 ++++++++++++++++++----
cloud/src/recycler/recycler.cpp | 4 +-
cloud/src/recycler/recycler.h | 1 -
gensrc/proto/cloud.proto | 4 +-
.../suites/cloud_p0/recycler/test_recycler.groovy | 2 +
14 files changed, 109 insertions(+), 52 deletions(-)
diff --git a/be/src/cloud/config.cpp b/be/src/cloud/config.cpp
index c52f5b96c20..ec36171f679 100644
--- a/be/src/cloud/config.cpp
+++ b/be/src/cloud/config.cpp
@@ -139,8 +139,7 @@ DEFINE_mInt64(packed_file_time_threshold_ms, "100");
// 100ms
DEFINE_mInt64(packed_file_try_lock_timeout_ms, "5"); // 5ms
DEFINE_mInt64(packed_file_small_file_count_threshold, "100");
DEFINE_mInt64(small_file_threshold_bytes, "1048576"); // 1MB
-DEFINE_mInt64(uploaded_file_retention_seconds, "60"); // 1 minute
-DEFINE_mInt64(index_retention_seconds, "60"); // 1 minute
+DEFINE_mInt64(uploaded_file_retention_seconds, "1800"); // 1 minute
DEFINE_mInt64(packed_file_cleanup_interval_seconds, "60"); // 1 minute
DEFINE_mBool(enable_standby_passive_compaction, "true");
diff --git a/be/src/cloud/config.h b/be/src/cloud/config.h
index 6dd0e34d86f..e800fac28ab 100644
--- a/be/src/cloud/config.h
+++ b/be/src/cloud/config.h
@@ -183,7 +183,6 @@ DECLARE_mInt64(packed_file_try_lock_timeout_ms);
DECLARE_mInt64(packed_file_small_file_count_threshold);
DECLARE_mInt64(small_file_threshold_bytes);
DECLARE_mInt64(uploaded_file_retention_seconds);
-DECLARE_mInt64(index_retention_seconds);
DECLARE_mInt64(packed_file_cleanup_interval_seconds);
DECLARE_mBool(enable_standby_passive_compaction);
diff --git a/be/src/io/fs/packed_file_manager.cpp
b/be/src/io/fs/packed_file_manager.cpp
index 94afa6d2bfd..377f4cce67e 100644
--- a/be/src/io/fs/packed_file_manager.cpp
+++ b/be/src/io/fs/packed_file_manager.cpp
@@ -85,24 +85,24 @@ Status append_packed_info_trailer(FileWriter* writer, const
std::string& packed_
packed_file_path);
}
- cloud::PackedFileDebugInfoPB debug_pb;
- debug_pb.mutable_packed_file_info()->CopyFrom(packed_file_info);
+ cloud::PackedFileFooterPB footer_pb;
+ footer_pb.mutable_packed_file_info()->CopyFrom(packed_file_info);
- std::string serialized_debug_info;
- if (!debug_pb.SerializeToString(&serialized_debug_info)) {
- return Status::InternalError("Failed to serialize packed file debug
info for {}",
+ std::string serialized_footer;
+ if (!footer_pb.SerializeToString(&serialized_footer)) {
+ return Status::InternalError("Failed to serialize packed file footer
info for {}",
packed_file_path);
}
- if (serialized_debug_info.size() >
+ if (serialized_footer.size() >
std::numeric_limits<uint32_t>::max() - kPackedFileTrailerSuffixSize) {
- return Status::InternalError("PackedFileDebugInfoPB too large for {}",
packed_file_path);
+ return Status::InternalError("PackedFileFooterPB too large for {}",
packed_file_path);
}
std::string trailer;
- trailer.reserve(serialized_debug_info.size() +
kPackedFileTrailerSuffixSize);
- trailer.append(serialized_debug_info);
- put_fixed32_le(&trailer,
static_cast<uint32_t>(serialized_debug_info.size()));
+ trailer.reserve(serialized_footer.size() + kPackedFileTrailerSuffixSize);
+ trailer.append(serialized_footer);
+ put_fixed32_le(&trailer, static_cast<uint32_t>(serialized_footer.size()));
put_fixed32_le(&trailer, kPackedFileTrailerVersion);
return writer->append(Slice(trailer));
diff --git a/be/src/io/fs/packed_file_trailer.cpp
b/be/src/io/fs/packed_file_trailer.cpp
index 99a87b9c3da..29ddf9db603 100644
--- a/be/src/io/fs/packed_file_trailer.cpp
+++ b/be/src/io/fs/packed_file_trailer.cpp
@@ -25,7 +25,7 @@
namespace doris::io {
-Status parse_packed_file_trailer(std::string_view data,
cloud::PackedFileDebugInfoPB* debug_pb,
+Status parse_packed_file_trailer(std::string_view data,
cloud::PackedFileFooterPB* debug_pb,
uint32_t* version) {
if (debug_pb == nullptr || version == nullptr) {
return Status::InvalidArgument("Output parameters must not be null");
@@ -39,14 +39,14 @@ Status parse_packed_file_trailer(std::string_view data,
cloud::PackedFileDebugIn
const uint32_t trailer_size = decode_fixed32_le(suffix_ptr);
const uint32_t trailer_version = decode_fixed32_le(suffix_ptr +
sizeof(uint32_t));
- // Preferred format: [PackedFileDebugInfoPB][length][version]
+ // Preferred format: [PackedFileFooterPB][length][version]
if (trailer_size > 0 && trailer_size <= data.size() -
kPackedFileTrailerSuffixSize) {
const size_t payload_offset = data.size() -
kPackedFileTrailerSuffixSize - trailer_size;
std::string_view payload(data.data() + payload_offset, trailer_size);
if (payload.size() >
static_cast<size_t>(std::numeric_limits<int>::max())) {
return Status::InternalError("Packed file trailer payload too
large");
}
- cloud::PackedFileDebugInfoPB parsed_pb;
+ cloud::PackedFileFooterPB parsed_pb;
if (parsed_pb.ParseFromArray(payload.data(),
static_cast<int>(payload.size()))) {
debug_pb->Swap(&parsed_pb);
*version = trailer_version;
@@ -80,8 +80,8 @@ Status parse_packed_file_trailer(std::string_view data,
cloud::PackedFileDebugIn
return Status::OK();
}
-Status read_packed_file_trailer(const std::string& file_path,
- cloud::PackedFileDebugInfoPB* debug_pb,
uint32_t* version) {
+Status read_packed_file_trailer(const std::string& file_path,
cloud::PackedFileFooterPB* debug_pb,
+ uint32_t* version) {
if (debug_pb == nullptr || version == nullptr) {
return Status::InvalidArgument("Output parameters must not be null");
}
diff --git a/be/src/io/fs/packed_file_trailer.h
b/be/src/io/fs/packed_file_trailer.h
index 21954b5507c..9741ed95c06 100644
--- a/be/src/io/fs/packed_file_trailer.h
+++ b/be/src/io/fs/packed_file_trailer.h
@@ -29,10 +29,10 @@ namespace doris::io {
constexpr uint32_t kPackedFileTrailerVersion = 1;
constexpr size_t kPackedFileTrailerSuffixSize = sizeof(uint32_t) * 2;
-Status parse_packed_file_trailer(std::string_view data,
cloud::PackedFileDebugInfoPB* debug_pb,
+Status parse_packed_file_trailer(std::string_view data,
cloud::PackedFileFooterPB* debug_pb,
uint32_t* version);
-Status read_packed_file_trailer(const std::string& file_path,
- cloud::PackedFileDebugInfoPB* debug_pb,
uint32_t* version);
+Status read_packed_file_trailer(const std::string& file_path,
cloud::PackedFileFooterPB* debug_pb,
+ uint32_t* version);
} // namespace doris::io
diff --git a/be/src/tools/packed_file_tool.cpp
b/be/src/tools/packed_file_tool.cpp
index 0d77d94b9e3..0e5b8b06403 100644
--- a/be/src/tools/packed_file_tool.cpp
+++ b/be/src/tools/packed_file_tool.cpp
@@ -38,7 +38,7 @@ int main(int argc, char** argv) {
return -1;
}
- doris::cloud::PackedFileDebugInfoPB debug_info;
+ doris::cloud::PackedFileFooterPB debug_info;
uint32_t version = 0;
doris::Status st = doris::io::read_packed_file_trailer(FLAGS_file,
&debug_info, &version);
if (!st.ok()) {
diff --git a/be/test/io/fs/packed_file_manager_test.cpp
b/be/test/io/fs/packed_file_manager_test.cpp
index a99ee158ad0..645f0896f1a 100644
--- a/be/test/io/fs/packed_file_manager_test.cpp
+++ b/be/test/io/fs/packed_file_manager_test.cpp
@@ -608,14 +608,14 @@ TEST_F(PackedFileManagerTest,
AppendPackedFileInfoToFileTail) {
ASSERT_NE(writer, nullptr);
const auto& data = writer->written_data();
- cloud::PackedFileDebugInfoPB parsed_debug;
+ cloud::PackedFileFooterPB parsed_footer;
uint32_t version = 0;
- auto st = parse_packed_file_trailer(data, &parsed_debug, &version);
+ auto st = parse_packed_file_trailer(data, &parsed_footer, &version);
ASSERT_TRUE(st.ok()) << st;
ASSERT_EQ(version, kPackedFileTrailerVersion);
- ASSERT_TRUE(parsed_debug.has_packed_file_info());
+ ASSERT_TRUE(parsed_footer.has_packed_file_info());
- const auto& parsed_info = parsed_debug.packed_file_info();
+ const auto& parsed_info = parsed_footer.packed_file_info();
ASSERT_EQ(parsed_info.slices_size(), 1);
EXPECT_EQ(parsed_info.slices(0).path(), "trailer_path");
EXPECT_EQ(parsed_info.slices(0).offset(), 0);
diff --git a/be/test/io/packed_file_trailer_test.cpp
b/be/test/io/packed_file_trailer_test.cpp
index 0028a237095..78e1f17754b 100644
--- a/be/test/io/packed_file_trailer_test.cpp
+++ b/be/test/io/packed_file_trailer_test.cpp
@@ -57,21 +57,21 @@ TEST(PackedFileTrailerTest, ReadNewFormatTrailer) {
slice->set_offset(10);
slice->set_size(20);
- cloud::PackedFileDebugInfoPB debug_pb;
- debug_pb.mutable_packed_file_info()->CopyFrom(info);
+ cloud::PackedFileFooterPB footer_pb;
+ footer_pb.mutable_packed_file_info()->CopyFrom(info);
- std::string serialized_debug;
- ASSERT_TRUE(debug_pb.SerializeToString(&serialized_debug));
+ std::string serialized_footer;
+ ASSERT_TRUE(footer_pb.SerializeToString(&serialized_footer));
std::string file_content = "data";
- file_content.append(serialized_debug);
- put_fixed32_le(&file_content,
static_cast<uint32_t>(serialized_debug.size()));
+ file_content.append(serialized_footer);
+ put_fixed32_le(&file_content,
static_cast<uint32_t>(serialized_footer.size()));
put_fixed32_le(&file_content, kPackedFileTrailerVersion);
auto path = unique_temp_file();
write_file(path, file_content);
- cloud::PackedFileDebugInfoPB parsed;
+ cloud::PackedFileFooterPB parsed;
uint32_t version = 0;
Status st = read_packed_file_trailer(path, &parsed, &version);
ASSERT_TRUE(st.ok()) << st;
@@ -101,7 +101,7 @@ TEST(PackedFileTrailerTest, ReadLegacyTrailer) {
auto path = unique_temp_file();
write_file(path, file_content);
- cloud::PackedFileDebugInfoPB parsed;
+ cloud::PackedFileFooterPB parsed;
uint32_t version = 0;
Status st = read_packed_file_trailer(path, &parsed, &version);
ASSERT_TRUE(st.ok()) << st;
diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h
index 28c121eec9b..3f7c4616232 100644
--- a/cloud/src/common/config.h
+++ b/cloud/src/common/config.h
@@ -120,6 +120,7 @@ CONF_mInt32(packed_file_txn_retry_times, "10");
// randomized interval to reduce conflict storms in FoundationDB, default
5-50ms
CONF_mInt64(packed_file_txn_retry_sleep_min_ms, "5");
CONF_mInt64(packed_file_txn_retry_sleep_max_ms, "50");
+CONF_mInt32(recycle_txn_delete_max_retry_times, "10");
// force recycler to recycle all useless object.
// **just for TEST**
diff --git a/cloud/src/recycler/checker.cpp b/cloud/src/recycler/checker.cpp
index 32687d026e7..51e6f38fc1e 100644
--- a/cloud/src/recycler/checker.cpp
+++ b/cloud/src/recycler/checker.cpp
@@ -992,21 +992,25 @@ int InstanceChecker::do_inverted_check() {
}
for (auto file = list_iter->next(); file.has_value(); file =
list_iter->next()) {
+ const auto& path = file->path;
+ if (path == "data/packed_file" ||
path.starts_with("data/packed_file/")) {
+ continue; // packed_file has dedicated check logic
+ }
++num_scanned;
- int ret = check_segment_file(file->path);
+ int ret = check_segment_file(path);
if (ret != 0) {
LOG(WARNING) << "failed to check segment file, uri=" <<
accessor->uri()
- << " path=" << file->path;
+ << " path=" << path;
if (ret == 1) {
++num_file_leak;
} else {
check_ret = -1;
}
}
- ret = check_inverted_index_file(file->path);
+ ret = check_inverted_index_file(path);
if (ret != 0) {
LOG(WARNING) << "failed to check index file, uri=" <<
accessor->uri()
- << " path=" << file->path;
+ << " path=" << path;
if (ret == 1) {
++num_file_leak;
} else {
@@ -2750,6 +2754,18 @@ int InstanceChecker::do_packed_file_check() {
// Step 1: Scan all rowset metas to collect packed_slice_locations
references
// Use efficient range scan instead of iterating through each tablet_id
+ auto collect_packed_refs = [&](const doris::RowsetMetaCloudPB& rs_meta) {
+ const auto& index_map = rs_meta.packed_slice_locations();
+ for (const auto& [small_file_path, index_pb] : index_map) {
+ if (!index_pb.has_packed_file_path() ||
index_pb.packed_file_path().empty()) {
+ continue;
+ }
+ const std::string& packed_file_path = index_pb.packed_file_path();
+ expected_ref_counts[packed_file_path]++;
+ packed_file_small_files[packed_file_path].insert(small_file_path);
+ }
+ };
+
{
std::string start_key = meta_rowset_key({instance_id_, 0, 0});
std::string end_key = meta_rowset_key({instance_id_, INT64_MAX, 0});
@@ -2789,16 +2805,57 @@ int InstanceChecker::do_packed_file_check() {
num_scanned_rowsets++;
- // Check packed_slice_locations in rowset meta
- const auto& index_map = rs_meta.packed_slice_locations();
- for (const auto& [small_file_path, index_pb] : index_map) {
- if (!index_pb.has_packed_file_path() ||
index_pb.packed_file_path().empty()) {
- continue;
- }
- const std::string& packed_file_path =
index_pb.packed_file_path();
- expected_ref_counts[packed_file_path]++;
-
packed_file_small_files[packed_file_path].insert(small_file_path);
+ collect_packed_refs(rs_meta);
+ }
+ start_key.push_back('\x00'); // Update to next smallest key for
iteration
+ } while (it->more() && !stopped());
+ }
+
+ // Rowsets in recycle keys may still hold packed file references while ref
count
+ // updates are pending, so include them when calculating expected
references.
+ {
+ std::string start_key = recycle_rowset_key({instance_id_, 0, ""});
+ std::string end_key = recycle_rowset_key({instance_id_, INT64_MAX,
"\xff"});
+
+ std::unique_ptr<RangeGetIterator> it;
+ do {
+ if (stopped()) {
+ return -1;
+ }
+
+ std::unique_ptr<Transaction> txn;
+ TxnErrorCode err = txn_kv_->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ LOG(WARNING) << "failed to create txn for recycle rowset scan
in packed file check";
+ return -1;
+ }
+
+ err = txn->get(start_key, end_key, &it);
+ if (err != TxnErrorCode::TXN_OK) {
+ LOG(WARNING) << "failed to scan recycle rowset metas, err=" <<
err;
+ check_ret = -1;
+ break;
+ }
+
+ while (it->has_next() && !stopped()) {
+ auto [k, v] = it->next();
+ if (!it->has_next()) {
+ start_key = k;
+ }
+
+ RecycleRowsetPB recycle_rowset;
+ if (!recycle_rowset.ParseFromArray(v.data(), v.size())) {
+ LOG(WARNING) << "malformed recycle rowset, key=" << hex(k);
+ check_ret = -1;
+ continue;
}
+
+ if (!recycle_rowset.has_rowset_meta()) {
+ continue;
+ }
+
+ num_scanned_rowsets++;
+ collect_packed_refs(recycle_rowset.rowset_meta());
}
start_key.push_back('\x00'); // Update to next smallest key for
iteration
} while (it->more() && !stopped());
diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp
index c9c6065722f..c7babdddbcd 100644
--- a/cloud/src/recycler/recycler.cpp
+++ b/cloud/src/recycler/recycler.cpp
@@ -5338,8 +5338,8 @@ int InstanceRecycler::recycle_expired_txn_label() {
concurrent_delete_executor.add([&]() {
int ret = delete_recycle_txn_kv(k);
if (ret == 1) {
- constexpr int MAX_RETRY = 10;
- for (size_t i = 1; i <= MAX_RETRY; ++i) {
+ const int max_retry = std::max(1,
config::recycle_txn_delete_max_retry_times);
+ for (int i = 1; i <= max_retry; ++i) {
LOG(WARNING) << "txn conflict, retry times=" << i << "
key=" << hex(k);
ret = delete_recycle_txn_kv(k);
// clang-format off
diff --git a/cloud/src/recycler/recycler.h b/cloud/src/recycler/recycler.h
index 5cf87bcffe0..a9f2b2c662d 100644
--- a/cloud/src/recycler/recycler.h
+++ b/cloud/src/recycler/recycler.h
@@ -434,7 +434,6 @@ private:
// return 0 for success otherwise error
int decrement_packed_file_ref_counts(const doris::RowsetMetaCloudPB&
rs_meta_pb);
- friend class
RecyclerTest_delete_rowset_data_packed_file_respects_recycled_tablet_Test;
int delete_packed_file_and_kv(const std::string& packed_file_path,
const std::string& packed_key,
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index 8f502cf439f..c6de06464fc 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -2157,9 +2157,9 @@ message PackedFileInfoPB {
optional string resource_id = 9;
}
-// Wrapper for packed file debug information. It keeps PackedFileInfoPB
extensible for tools
+// Wrapper for packed file footer information. It keeps PackedFileInfoPB
extensible for tools
// reading packed file trailers.
-message PackedFileDebugInfoPB {
+message PackedFileFooterPB {
optional PackedFileInfoPB packed_file_info = 1;
}
diff --git a/regression-test/suites/cloud_p0/recycler/test_recycler.groovy
b/regression-test/suites/cloud_p0/recycler/test_recycler.groovy
index eeb971cd4ba..3a6e99f1778 100644
--- a/regression-test/suites/cloud_p0/recycler/test_recycler.groovy
+++ b/regression-test/suites/cloud_p0/recycler/test_recycler.groovy
@@ -51,6 +51,8 @@ suite("test_recycler") {
}
}
+ sql """ drop table IF EXISTS __internal_schema.audit_log force """
+
do {
triggerRecycle(token, instanceId)
Thread.sleep(10000)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]