wgtmac commented on code in PR #592:
URL: https://github.com/apache/iceberg-cpp/pull/592#discussion_r2965627717


##########
src/iceberg/update/expire_snapshots.cc:
##########
@@ -285,7 +291,247 @@ Result<ExpireSnapshots::ApplyResult> 
ExpireSnapshots::Apply() {
                           });
   }
 
+  // Cache the result for use during Finalize()
+  apply_result_ = result;
+
   return result;
 }
 
+Status ExpireSnapshots::Finalize(std::optional<Error> commit_error) {
+  if (commit_error.has_value()) {
+    return {};
+  }
+
+  if (cleanup_level_ == CleanupLevel::kNone) {
+    return {};
+  }
+
+  if (!apply_result_.has_value() || 
apply_result_->snapshot_ids_to_remove.empty()) {
+    return {};
+  }
+
+  // File cleanup is best-effort: log and continue on individual file deletion 
failures
+  // to avoid blocking metadata updates (matching Java behavior).
+  return CleanExpiredFiles(apply_result_->snapshot_ids_to_remove);
+}
+
+void ExpireSnapshots::DeleteFilePath(const std::string& path) {
+  try {
+    if (delete_func_) {
+      delete_func_(path);
+    } else {
+      auto status = ctx_->table->io()->DeleteFile(path);
+      // Best-effort: ignore NotFound (file already deleted) and other errors.
+      // Java uses suppressFailureWhenFinished + onFailure logging.
+      std::ignore = status;
+    }
+  } catch (...) {
+    // Suppress all exceptions during file cleanup to match Java's
+    // suppressFailureWhenFinished behavior.
+  }
+}
+
+Status ExpireSnapshots::ReadManifestsForSnapshot(
+    int64_t snapshot_id, std::unordered_set<std::string>& manifest_paths) {
+  const TableMetadata& metadata = base();
+  auto file_io = ctx_->table->io();
+
+  auto snapshot_result = metadata.SnapshotById(snapshot_id);
+  if (!snapshot_result.has_value()) {
+    return {};
+  }
+  auto& snapshot = snapshot_result.value();
+
+  SnapshotCache snapshot_cache(snapshot.get());
+  auto manifests_result = snapshot_cache.Manifests(file_io);
+  if (!manifests_result.has_value()) {
+    // Best-effort: skip this snapshot if we can't read its manifests
+    return {};
+  }
+
+  for (const auto& manifest : manifests_result.value()) {
+    manifest_paths.insert(manifest.manifest_path);
+  }
+
+  return {};
+}
+
+Status ExpireSnapshots::FindDataFilesToDelete(
+    const std::unordered_set<std::string>& manifests_to_delete,
+    const std::unordered_set<std::string>& retained_manifests,
+    std::unordered_set<std::string>& data_files_to_delete) {
+  const TableMetadata& metadata = base();
+  auto file_io = ctx_->table->io();
+
+  // Step 1: Collect all file paths from manifests being deleted
+  for (const auto& manifest_path : manifests_to_delete) {
+    // Find the ManifestFile for this path by scanning expired snapshots
+    for (const auto& snapshot : metadata.snapshots) {
+      if (!snapshot) continue;
+      SnapshotCache snapshot_cache(snapshot.get());
+      auto manifests_result = snapshot_cache.Manifests(file_io);
+      if (!manifests_result.has_value()) continue;
+
+      for (const auto& manifest : manifests_result.value()) {
+        if (manifest.manifest_path != manifest_path) continue;
+
+        auto schema_result = metadata.Schema();
+        if (!schema_result.has_value()) continue;
+        auto spec_result = 
metadata.PartitionSpecById(manifest.partition_spec_id);
+        if (!spec_result.has_value()) continue;
+
+        auto reader_result = ManifestReader::Make(
+            manifest, file_io, schema_result.value(), spec_result.value());
+        if (!reader_result.has_value()) continue;
+
+        auto entries_result = reader_result.value()->Entries();
+        if (!entries_result.has_value()) continue;
+
+        for (const auto& entry : entries_result.value()) {
+          if (entry.data_file) {
+            data_files_to_delete.insert(entry.data_file->file_path);
+          }
+        }
+        goto next_manifest;  // Found and processed this manifest, move to next
+      }
+    }
+  next_manifest:;
+  }
+
+  if (data_files_to_delete.empty()) {
+    return {};
+  }
+
+  // Step 2: Remove any files that are still referenced by retained manifests.
+  // This ensures we don't delete files that are shared across manifests.
+  for (const auto& manifest_path : retained_manifests) {
+    if (data_files_to_delete.empty()) break;
+
+    for (const auto& snapshot : metadata.snapshots) {
+      if (!snapshot) continue;
+      SnapshotCache snapshot_cache(snapshot.get());
+      auto manifests_result = snapshot_cache.Manifests(file_io);
+      if (!manifests_result.has_value()) continue;
+
+      for (const auto& manifest : manifests_result.value()) {
+        if (manifest.manifest_path != manifest_path) continue;
+
+        auto schema_result = metadata.Schema();
+        if (!schema_result.has_value()) continue;
+        auto spec_result = 
metadata.PartitionSpecById(manifest.partition_spec_id);
+        if (!spec_result.has_value()) continue;
+
+        auto reader_result = ManifestReader::Make(
+            manifest, file_io, schema_result.value(), spec_result.value());
+        if (!reader_result.has_value()) continue;

Review Comment:
   **Inconsistent Behavior (Data Loss Risk):** If reading a **retained** 
manifest fails, the C++ implementation silently ignores it (`continue`). Java 
uses `.retry(3)` and `.throwFailureWhenFinished()`. If we silently ignore a 
read failure here, we will fail to subtract its live data files from 
`data_files_to_delete`, resulting in **accidental data loss** (deleting a 
physical file that is still actively used). Failures here should abort the 
deletion of those specific data files.



##########
src/iceberg/update/expire_snapshots.cc:
##########
@@ -285,7 +291,247 @@ Result<ExpireSnapshots::ApplyResult> 
ExpireSnapshots::Apply() {
                           });
   }
 
+  // Cache the result for use during Finalize()
+  apply_result_ = result;
+
   return result;
 }
 
+Status ExpireSnapshots::Finalize(std::optional<Error> commit_error) {
+  if (commit_error.has_value()) {
+    return {};
+  }
+
+  if (cleanup_level_ == CleanupLevel::kNone) {
+    return {};
+  }
+
+  if (!apply_result_.has_value() || 
apply_result_->snapshot_ids_to_remove.empty()) {
+    return {};
+  }
+
+  // File cleanup is best-effort: log and continue on individual file deletion 
failures
+  // to avoid blocking metadata updates (matching Java behavior).
+  return CleanExpiredFiles(apply_result_->snapshot_ids_to_remove);
+}
+
+void ExpireSnapshots::DeleteFilePath(const std::string& path) {
+  try {
+    if (delete_func_) {
+      delete_func_(path);
+    } else {
+      auto status = ctx_->table->io()->DeleteFile(path);
+      // Best-effort: ignore NotFound (file already deleted) and other errors.
+      // Java uses suppressFailureWhenFinished + onFailure logging.
+      std::ignore = status;
+    }
+  } catch (...) {
+    // Suppress all exceptions during file cleanup to match Java's
+    // suppressFailureWhenFinished behavior.
+  }
+}
+
+Status ExpireSnapshots::ReadManifestsForSnapshot(
+    int64_t snapshot_id, std::unordered_set<std::string>& manifest_paths) {
+  const TableMetadata& metadata = base();
+  auto file_io = ctx_->table->io();
+
+  auto snapshot_result = metadata.SnapshotById(snapshot_id);
+  if (!snapshot_result.has_value()) {
+    return {};
+  }
+  auto& snapshot = snapshot_result.value();
+
+  SnapshotCache snapshot_cache(snapshot.get());
+  auto manifests_result = snapshot_cache.Manifests(file_io);
+  if (!manifests_result.has_value()) {
+    // Best-effort: skip this snapshot if we can't read its manifests
+    return {};
+  }
+
+  for (const auto& manifest : manifests_result.value()) {
+    manifest_paths.insert(manifest.manifest_path);
+  }
+
+  return {};
+}
+
+Status ExpireSnapshots::FindDataFilesToDelete(
+    const std::unordered_set<std::string>& manifests_to_delete,
+    const std::unordered_set<std::string>& retained_manifests,
+    std::unordered_set<std::string>& data_files_to_delete) {
+  const TableMetadata& metadata = base();
+  auto file_io = ctx_->table->io();
+
+  // Step 1: Collect all file paths from manifests being deleted
+  for (const auto& manifest_path : manifests_to_delete) {
+    // Find the ManifestFile for this path by scanning expired snapshots
+    for (const auto& snapshot : metadata.snapshots) {
+      if (!snapshot) continue;
+      SnapshotCache snapshot_cache(snapshot.get());
+      auto manifests_result = snapshot_cache.Manifests(file_io);
+      if (!manifests_result.has_value()) continue;
+
+      for (const auto& manifest : manifests_result.value()) {
+        if (manifest.manifest_path != manifest_path) continue;
+
+        auto schema_result = metadata.Schema();
+        if (!schema_result.has_value()) continue;
+        auto spec_result = 
metadata.PartitionSpecById(manifest.partition_spec_id);
+        if (!spec_result.has_value()) continue;
+
+        auto reader_result = ManifestReader::Make(
+            manifest, file_io, schema_result.value(), spec_result.value());
+        if (!reader_result.has_value()) continue;
+
+        auto entries_result = reader_result.value()->Entries();
+        if (!entries_result.has_value()) continue;
+
+        for (const auto& entry : entries_result.value()) {
+          if (entry.data_file) {
+            data_files_to_delete.insert(entry.data_file->file_path);
+          }
+        }
+        goto next_manifest;  // Found and processed this manifest, move to next

Review Comment:
   **C++ Style Issue:** The use of `goto next_manifest;` to break out of nested 
loops is a non-idiomatic C++ anti-pattern. Consider moving this manifest lookup 
logic into a helper function (e.g., `std::optional<ManifestFile> 
GetManifestByPath(path)`).



##########
src/iceberg/update/expire_snapshots.cc:
##########
@@ -285,7 +291,247 @@ Result<ExpireSnapshots::ApplyResult> 
ExpireSnapshots::Apply() {
                           });
   }
 
+  // Cache the result for use during Finalize()
+  apply_result_ = result;
+
   return result;
 }
 
+Status ExpireSnapshots::Finalize(std::optional<Error> commit_error) {
+  if (commit_error.has_value()) {
+    return {};
+  }
+
+  if (cleanup_level_ == CleanupLevel::kNone) {
+    return {};
+  }
+
+  if (!apply_result_.has_value() || 
apply_result_->snapshot_ids_to_remove.empty()) {
+    return {};
+  }
+
+  // File cleanup is best-effort: log and continue on individual file deletion 
failures
+  // to avoid blocking metadata updates (matching Java behavior).
+  return CleanExpiredFiles(apply_result_->snapshot_ids_to_remove);
+}
+
+void ExpireSnapshots::DeleteFilePath(const std::string& path) {
+  try {
+    if (delete_func_) {
+      delete_func_(path);
+    } else {
+      auto status = ctx_->table->io()->DeleteFile(path);
+      // Best-effort: ignore NotFound (file already deleted) and other errors.
+      // Java uses suppressFailureWhenFinished + onFailure logging.
+      std::ignore = status;
+    }
+  } catch (...) {
+    // Suppress all exceptions during file cleanup to match Java's
+    // suppressFailureWhenFinished behavior.
+  }
+}
+
+Status ExpireSnapshots::ReadManifestsForSnapshot(
+    int64_t snapshot_id, std::unordered_set<std::string>& manifest_paths) {
+  const TableMetadata& metadata = base();
+  auto file_io = ctx_->table->io();
+
+  auto snapshot_result = metadata.SnapshotById(snapshot_id);
+  if (!snapshot_result.has_value()) {
+    return {};
+  }
+  auto& snapshot = snapshot_result.value();
+
+  SnapshotCache snapshot_cache(snapshot.get());
+  auto manifests_result = snapshot_cache.Manifests(file_io);
+  if (!manifests_result.has_value()) {
+    // Best-effort: skip this snapshot if we can't read its manifests
+    return {};
+  }
+
+  for (const auto& manifest : manifests_result.value()) {
+    manifest_paths.insert(manifest.manifest_path);
+  }
+
+  return {};
+}
+
+Status ExpireSnapshots::FindDataFilesToDelete(
+    const std::unordered_set<std::string>& manifests_to_delete,
+    const std::unordered_set<std::string>& retained_manifests,
+    std::unordered_set<std::string>& data_files_to_delete) {

Review Comment:
   **C++ Style Issue:** Using mutable out-parameters 
(`std::unordered_set<std::string>& data_files_to_delete`) over `Result` is a 
less modern C++ pattern. Consider returning 
`Result<std::unordered_set<std::string>>`.



##########
src/iceberg/update/expire_snapshots.cc:
##########
@@ -285,7 +291,247 @@ Result<ExpireSnapshots::ApplyResult> 
ExpireSnapshots::Apply() {
                           });
   }
 
+  // Cache the result for use during Finalize()
+  apply_result_ = result;
+
   return result;
 }
 
+Status ExpireSnapshots::Finalize(std::optional<Error> commit_error) {
+  if (commit_error.has_value()) {
+    return {};
+  }
+
+  if (cleanup_level_ == CleanupLevel::kNone) {
+    return {};
+  }
+
+  if (!apply_result_.has_value() || 
apply_result_->snapshot_ids_to_remove.empty()) {
+    return {};
+  }
+
+  // File cleanup is best-effort: log and continue on individual file deletion 
failures
+  // to avoid blocking metadata updates (matching Java behavior).
+  return CleanExpiredFiles(apply_result_->snapshot_ids_to_remove);
+}
+
+void ExpireSnapshots::DeleteFilePath(const std::string& path) {
+  try {
+    if (delete_func_) {
+      delete_func_(path);
+    } else {
+      auto status = ctx_->table->io()->DeleteFile(path);
+      // Best-effort: ignore NotFound (file already deleted) and other errors.
+      // Java uses suppressFailureWhenFinished + onFailure logging.
+      std::ignore = status;
+    }
+  } catch (...) {
+    // Suppress all exceptions during file cleanup to match Java's
+    // suppressFailureWhenFinished behavior.
+  }
+}
+
+Status ExpireSnapshots::ReadManifestsForSnapshot(
+    int64_t snapshot_id, std::unordered_set<std::string>& manifest_paths) {
+  const TableMetadata& metadata = base();
+  auto file_io = ctx_->table->io();
+
+  auto snapshot_result = metadata.SnapshotById(snapshot_id);
+  if (!snapshot_result.has_value()) {
+    return {};
+  }
+  auto& snapshot = snapshot_result.value();
+
+  SnapshotCache snapshot_cache(snapshot.get());
+  auto manifests_result = snapshot_cache.Manifests(file_io);
+  if (!manifests_result.has_value()) {
+    // Best-effort: skip this snapshot if we can't read its manifests
+    return {};
+  }
+
+  for (const auto& manifest : manifests_result.value()) {
+    manifest_paths.insert(manifest.manifest_path);
+  }
+
+  return {};
+}
+
+Status ExpireSnapshots::FindDataFilesToDelete(
+    const std::unordered_set<std::string>& manifests_to_delete,
+    const std::unordered_set<std::string>& retained_manifests,
+    std::unordered_set<std::string>& data_files_to_delete) {
+  const TableMetadata& metadata = base();
+  auto file_io = ctx_->table->io();
+
+  // Step 1: Collect all file paths from manifests being deleted
+  for (const auto& manifest_path : manifests_to_delete) {
+    // Find the ManifestFile for this path by scanning expired snapshots
+    for (const auto& snapshot : metadata.snapshots) {
+      if (!snapshot) continue;
+      SnapshotCache snapshot_cache(snapshot.get());
+      auto manifests_result = snapshot_cache.Manifests(file_io);
+      if (!manifests_result.has_value()) continue;
+
+      for (const auto& manifest : manifests_result.value()) {
+        if (manifest.manifest_path != manifest_path) continue;
+
+        auto schema_result = metadata.Schema();
+        if (!schema_result.has_value()) continue;
+        auto spec_result = 
metadata.PartitionSpecById(manifest.partition_spec_id);
+        if (!spec_result.has_value()) continue;
+
+        auto reader_result = ManifestReader::Make(
+            manifest, file_io, schema_result.value(), spec_result.value());
+        if (!reader_result.has_value()) continue;
+
+        auto entries_result = reader_result.value()->Entries();
+        if (!entries_result.has_value()) continue;
+
+        for (const auto& entry : entries_result.value()) {
+          if (entry.data_file) {
+            data_files_to_delete.insert(entry.data_file->file_path);
+          }
+        }
+        goto next_manifest;  // Found and processed this manifest, move to next
+      }
+    }
+  next_manifest:;
+  }
+
+  if (data_files_to_delete.empty()) {
+    return {};
+  }
+
+  // Step 2: Remove any files that are still referenced by retained manifests.
+  // This ensures we don't delete files that are shared across manifests.
+  for (const auto& manifest_path : retained_manifests) {
+    if (data_files_to_delete.empty()) break;
+
+    for (const auto& snapshot : metadata.snapshots) {
+      if (!snapshot) continue;
+      SnapshotCache snapshot_cache(snapshot.get());
+      auto manifests_result = snapshot_cache.Manifests(file_io);
+      if (!manifests_result.has_value()) continue;
+
+      for (const auto& manifest : manifests_result.value()) {
+        if (manifest.manifest_path != manifest_path) continue;
+
+        auto schema_result = metadata.Schema();
+        if (!schema_result.has_value()) continue;
+        auto spec_result = 
metadata.PartitionSpecById(manifest.partition_spec_id);
+        if (!spec_result.has_value()) continue;
+
+        auto reader_result = ManifestReader::Make(
+            manifest, file_io, schema_result.value(), spec_result.value());
+        if (!reader_result.has_value()) continue;
+
+        auto entries_result = reader_result.value()->Entries();
+        if (!entries_result.has_value()) continue;
+
+        for (const auto& entry : entries_result.value()) {
+          if (entry.data_file) {
+            data_files_to_delete.erase(entry.data_file->file_path);
+          }
+        }
+        goto next_retained;
+      }
+    }
+  next_retained:;
+  }
+
+  return {};
+}
+
+Status ExpireSnapshots::CleanExpiredFiles(
+    const std::vector<int64_t>& expired_snapshot_ids) {
+  const TableMetadata& metadata = base();
+
+  // Build expired and retained snapshot ID sets.
+  // The retained set includes ALL snapshots referenced by any branch or tag,
+  // since Apply() already computed retention across all refs.
+  std::unordered_set<int64_t> expired_id_set(expired_snapshot_ids.begin(),
+                                             expired_snapshot_ids.end());
+  std::unordered_set<int64_t> retained_snapshot_ids;
+  for (const auto& snapshot : metadata.snapshots) {
+    if (snapshot && !expired_id_set.contains(snapshot->snapshot_id)) {
+      retained_snapshot_ids.insert(snapshot->snapshot_id);
+    }
+  }
+
+  // Phase 1: Collect manifest paths from expired and retained snapshots.
+  // TODO(shangxinli): Parallelize manifest collection with a thread pool.
+  std::unordered_set<std::string> expired_manifest_paths;
+  for (int64_t snapshot_id : expired_snapshot_ids) {
+    std::ignore = ReadManifestsForSnapshot(snapshot_id, 
expired_manifest_paths);
+  }
+
+  std::unordered_set<std::string> retained_manifest_paths;
+  for (int64_t snapshot_id : retained_snapshot_ids) {
+    std::ignore = ReadManifestsForSnapshot(snapshot_id, 
retained_manifest_paths);
+  }
+
+  // Phase 2: Prune manifests still referenced by retained snapshots.
+  // Only manifests exclusively in expired snapshots should be deleted.
+  std::unordered_set<std::string> manifests_to_delete;
+  for (const auto& path : expired_manifest_paths) {
+    if (!retained_manifest_paths.contains(path)) {
+      manifests_to_delete.insert(path);
+    }
+  }
+
+  // Phase 3: If cleanup level is kAll, find data files to delete.
+  // Only read entries from manifests being deleted (not all expired 
manifests),
+  // then subtract any files still reachable from retained manifests.
+  if (cleanup_level_ == CleanupLevel::kAll && !manifests_to_delete.empty()) {
+    std::unordered_set<std::string> data_files_to_delete;
+    std::ignore = FindDataFilesToDelete(manifests_to_delete, 
retained_manifest_paths,
+                                        data_files_to_delete);
+
+    // TODO(shangxinli): Parallelize file deletion with a thread pool.
+    for (const auto& path : data_files_to_delete) {
+      DeleteFilePath(path);
+    }
+  }
+
+  // Phase 4: Delete orphaned manifest files.
+  for (const auto& path : manifests_to_delete) {
+    DeleteFilePath(path);
+  }
+
+  // Phase 5: Delete manifest lists from expired snapshots.
+  for (int64_t snapshot_id : expired_snapshot_ids) {
+    auto snapshot_result = metadata.SnapshotById(snapshot_id);
+    if (!snapshot_result.has_value()) continue;
+    auto& snapshot = snapshot_result.value();
+    if (!snapshot->manifest_list.empty()) {
+      DeleteFilePath(snapshot->manifest_list);
+    }
+  }
+
+  // Phase 6: Delete expired statistics files.
+  // Use set difference between before and after states (matching Java 
behavior).
+  // Since Finalize runs before table_ is updated, "after" is base() minus 
expired.
+  std::unordered_set<int64_t> retained_stats_snapshots(retained_snapshot_ids);
+  for (const auto& stat_file : metadata.statistics) {
+    if (stat_file && 
!retained_stats_snapshots.contains(stat_file->snapshot_id)) {

Review Comment:
   **Inconsistent Behavior:** C++ deletes statistics files by checking if the 
`snapshot_id` of the `StatisticsFile` is in the `retained_stats_snapshots` set. 
Java computes the set difference of actual **file paths**. If a statistics file 
path is referenced by multiple snapshots, deleting purely based on the 
expiration of a specific `snapshot_id` could erroneously delete a physical file 
that is still referenced by a newer, retained snapshot. Diff paths, not just 
snapshot IDs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to