This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 7ed063acf fix(sideloading): avoid the expected data deletion during
SST side loading (#3342)
7ed063acf is described below
commit 7ed063acfdf525c4f68ced1d4ac3a4108afe6039
Author: Luigi Tagliamonte <[email protected]>
AuthorDate: Tue Jan 20 18:53:23 2026 -0800
fix(sideloading): avoid the expected data deletion during SST side loading
(#3342)
---
src/storage/storage.cc | 62 ++++++++++++++++++++------------------------------
1 file changed, 25 insertions(+), 37 deletions(-)
diff --git a/src/storage/storage.cc b/src/storage/storage.cc
index 7799e089e..ca0f17daa 100644
--- a/src/storage/storage.cc
+++ b/src/storage/storage.cc
@@ -802,53 +802,41 @@ StatusOr<int> Storage::IngestSST(const std::string
&sst_dir, const rocksdb::Inge
return 0;
}
- std::unordered_map<ColumnFamilyID, std::vector<std::string>> cf_files;
+ // Group SST files by column family
+ const auto &column_families = ColumnFamilyConfigs::ListAllColumnFamilies();
+ std::unordered_map<ColumnFamilyID, std::vector<std::string>>
column_family_files;
for (const auto &file : sst_files) {
- bool matched = false;
- for (const auto &cf : ColumnFamilyConfigs::ListAllColumnFamilies()) {
- if (file.find(cf.Name()) != std::string::npos) {
- cf_files[cf.Id()].push_back(file);
- matched = true;
- break;
- }
- }
- if (!matched) {
+ auto iter = std::find_if(column_families.begin(), column_families.end(),
+ [&file](const auto &cf) { return
file.find(cf.Name()) != std::string::npos; });
+ if (iter == column_families.end()) {
return {Status::NotOK, fmt::format("SST file '{}' does not match any
known column family name", file)};
}
+ column_family_files[iter->Id()].push_back(file);
}
- // Process each set of files with the appropriate column family
- // By importing the specific column family SST files first, we avoid data
corruption -
- // if import fails, no data is made available or corrupted in either column
family
- // if the metadata import fails, the imported data will be deleted by the
compaction.
- rocksdb::Status status;
- // Process files for each column family except metadata
- for (const auto &[cf, files] : cf_files) {
- if (cf == ColumnFamilyID::Metadata) continue;
- if (files.empty()) continue;
-
- rocksdb::ColumnFamilyHandle *cf_handle = GetCFHandle(cf);
-
- status = ingestSST(cf_handle, ingest_options, files);
- if (!status.ok()) {
- return {Status::NotOK, status.ToString()};
- }
+ // Build ingestion arguments for atomic ingestion across all column families
+ // IngestExternalFiles API ingests all files atomically - either all succeed
or all fail
+ std::vector<rocksdb::IngestExternalFileArg> ingest_args;
+ ingest_args.reserve(column_family_files.size());
+ for (auto &[cf_id, files] : column_family_files) {
+ rocksdb::IngestExternalFileArg arg;
+ arg.column_family = GetCFHandle(cf_id);
+ arg.external_files = std::move(files);
+ arg.options = ingest_options;
+ ingest_args.push_back(std::move(arg));
}
- // Process metadata files
- const auto &metadata_files = cf_files[ColumnFamilyID::Metadata];
- if (!metadata_files.empty()) {
- status = ingestSST(GetCFHandle(ColumnFamilyID::Metadata), ingest_options,
metadata_files);
+
+ if (!ingest_args.empty()) {
+ rocksdb::Status status = db_->IngestExternalFiles(ingest_args);
if (!status.ok()) {
- return {Status::NotOK, status.ToString()};
+ ERROR("Failed to atomically ingest SST files across column families:
{}", status.ToString());
+ return {Status::NotOK,
+ fmt::format("Failed to atomically ingest SST files across column
families: {}", status.ToString())};
}
}
- return sst_files.size();
-}
-rocksdb::Status Storage::ingestSST(rocksdb::ColumnFamilyHandle *cf_handle,
- const rocksdb::IngestExternalFileOptions
&options,
- const std::vector<std::string>
&sst_file_names) {
- return db_->IngestExternalFile(cf_handle, sst_file_names, options);
+ INFO("Successfully ingested {} SST files atomically across all column
families", sst_files.size());
+ return sst_files.size();
}
void Storage::FlushBlockCache() { shared_block_cache_->EraseUnRefEntries(); }