This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 7ed063acf fix(sideloading): avoid the expected data deletion during 
SST side loading (#3342)
7ed063acf is described below

commit 7ed063acfdf525c4f68ced1d4ac3a4108afe6039
Author: Luigi Tagliamonte <[email protected]>
AuthorDate: Tue Jan 20 18:53:23 2026 -0800

    fix(sideloading): avoid the expected data deletion during SST side loading 
(#3342)
---
 src/storage/storage.cc | 62 ++++++++++++++++++++------------------------------
 1 file changed, 25 insertions(+), 37 deletions(-)

diff --git a/src/storage/storage.cc b/src/storage/storage.cc
index 7799e089e..ca0f17daa 100644
--- a/src/storage/storage.cc
+++ b/src/storage/storage.cc
@@ -802,53 +802,41 @@ StatusOr<int> Storage::IngestSST(const std::string 
&sst_dir, const rocksdb::Inge
     return 0;
   }
 
-  std::unordered_map<ColumnFamilyID, std::vector<std::string>> cf_files;
+  // Group SST files by column family
+  const auto &column_families = ColumnFamilyConfigs::ListAllColumnFamilies();
+  std::unordered_map<ColumnFamilyID, std::vector<std::string>> 
column_family_files;
   for (const auto &file : sst_files) {
-    bool matched = false;
-    for (const auto &cf : ColumnFamilyConfigs::ListAllColumnFamilies()) {
-      if (file.find(cf.Name()) != std::string::npos) {
-        cf_files[cf.Id()].push_back(file);
-        matched = true;
-        break;
-      }
-    }
-    if (!matched) {
+    auto iter = std::find_if(column_families.begin(), column_families.end(),
+                             [&file](const auto &cf) { return 
file.find(cf.Name()) != std::string::npos; });
+    if (iter == column_families.end()) {
       return {Status::NotOK, fmt::format("SST file '{}' does not match any 
known column family name", file)};
     }
+    column_family_files[iter->Id()].push_back(file);
   }
 
-  // Process each set of files with the appropriate column family
-  // By importing the specific column family SST files first, we avoid data 
corruption -
-  // if import fails, no data is made available or corrupted in either column 
family
-  // if the metadata import fails, the imported data will be deleted by the 
compaction.
-  rocksdb::Status status;
-  // Process files for each column family except metadata
-  for (const auto &[cf, files] : cf_files) {
-    if (cf == ColumnFamilyID::Metadata) continue;
-    if (files.empty()) continue;
-
-    rocksdb::ColumnFamilyHandle *cf_handle = GetCFHandle(cf);
-
-    status = ingestSST(cf_handle, ingest_options, files);
-    if (!status.ok()) {
-      return {Status::NotOK, status.ToString()};
-    }
+  // Build ingestion arguments for atomic ingestion across all column families
+  // IngestExternalFiles API ingests all files atomically - either all succeed 
or all fail
+  std::vector<rocksdb::IngestExternalFileArg> ingest_args;
+  ingest_args.reserve(column_family_files.size());
+  for (auto &[cf_id, files] : column_family_files) {
+    rocksdb::IngestExternalFileArg arg;
+    arg.column_family = GetCFHandle(cf_id);
+    arg.external_files = std::move(files);
+    arg.options = ingest_options;
+    ingest_args.push_back(std::move(arg));
   }
-  // Process metadata files
-  const auto &metadata_files = cf_files[ColumnFamilyID::Metadata];
-  if (!metadata_files.empty()) {
-    status = ingestSST(GetCFHandle(ColumnFamilyID::Metadata), ingest_options, 
metadata_files);
+
+  if (!ingest_args.empty()) {
+    rocksdb::Status status = db_->IngestExternalFiles(ingest_args);
     if (!status.ok()) {
-      return {Status::NotOK, status.ToString()};
+      ERROR("Failed to atomically ingest SST files across column families: 
{}", status.ToString());
+      return {Status::NotOK,
+              fmt::format("Failed to atomically ingest SST files across column 
families: {}", status.ToString())};
     }
   }
-  return sst_files.size();
-}
 
-rocksdb::Status Storage::ingestSST(rocksdb::ColumnFamilyHandle *cf_handle,
-                                   const rocksdb::IngestExternalFileOptions 
&options,
-                                   const std::vector<std::string> 
&sst_file_names) {
-  return db_->IngestExternalFile(cf_handle, sst_file_names, options);
+  INFO("Successfully ingested {} SST files atomically across all column 
families", sst_files.size());
+  return sst_files.size();
 }
 
 void Storage::FlushBlockCache() { shared_block_cache_->EraseUnRefEntries(); }

Reply via email to