shangxinli commented on code in PR #648:
URL: https://github.com/apache/iceberg-cpp/pull/648#discussion_r3266810698
##########
src/iceberg/update/expire_snapshots.cc:
##########
@@ -331,6 +332,309 @@ class ReachableFileCleanup : public FileCleanupStrategy {
}
};
+/// \brief Incremental file cleanup strategy for simple linear-ancestry
expirations.
+///
+/// Mirrors Java's IncrementalFileCleanup. Only safe when:
+/// * No snapshot IDs were explicitly listed for expiration.
+/// * No removed snapshots lived outside the current main ancestry.
+/// * No retained snapshots live outside the current main ancestry.
+///
+/// Each manifest is attributed to its writer snapshot via added_snapshot_id,
so
+/// two snapshot passes are enough -- one over retained snapshots to learn
which
+/// manifests are still live, one over expired snapshots to learn which
manifests,
+/// manifest lists, and data files to drop. Cherry-pick protection via
+/// SnapshotSummaryFields::kSourceSnapshotId prevents removing data that was
+/// logically introduced by a snapshot whose changes are still present in the
+/// current state under a different id.
+///
+/// TODO(shangxinli): Add multi-threaded manifest reading and file deletion
support.
+class IncrementalFileCleanup : public FileCleanupStrategy {
+ public:
+ using FileCleanupStrategy::FileCleanupStrategy;
+
+ Status CleanFiles(const TableMetadata& metadata_before_expiration,
+ const TableMetadata& metadata_after_expiration,
+ const std::unordered_set<int64_t>& expired_snapshot_ids,
Review Comment:
Done in b57207a — both strategies now derive expired IDs from before/after
metadata.
##########
src/iceberg/test/expire_snapshots_test.cc:
##########
@@ -573,4 +580,75 @@ TEST_F(ExpireSnapshotsCleanupTest,
KeepsReusedPartitionStats) {
EXPECT_THAT(deleted_files,
testing::Not(testing::Contains(reused_statistics_path)));
}
+// Linear-ancestry, no specified ID: dispatch must pick IncrementalFileCleanup.
Review Comment:
Trimmed in b57207a.
##########
src/iceberg/update/expire_snapshots.cc:
##########
@@ -613,11 +917,23 @@ Status ExpireSnapshots::Finalize(Result<const
TableMetadata*> commit_result) {
apply_result_.reset();
// File cleanup is best-effort: log and continue on individual file deletion
failures
- ReachableFileCleanup strategy(ctx_->table->io(), delete_func_);
- return strategy.CleanFiles(metadata_before_expiration,
metadata_after_expiration,
- expired_ids, cleanup_level_);
+ // Pick incremental cleanup when the expiration is a simple linear-ancestry
walk:
+ // no explicit snapshot IDs, no removed snapshots outside main ancestry, and
no
+ // retained snapshots outside main ancestry. Mirrors Java RemoveSnapshots's
+ // dispatch in cleanExpiredSnapshots().
+ bool can_use_incremental = !specified_snapshot_id_ &&
+
!HasRemovedNonMainAncestors(metadata_before_expiration,
+
metadata_after_expiration) &&
+ !HasNonMainSnapshots(metadata_after_expiration);
+
+ std::unique_ptr<FileCleanupStrategy> strategy;
+ if (can_use_incremental) {
+ strategy = std::make_unique<IncrementalFileCleanup>(ctx_->table->io(),
delete_func_);
+ } else {
+ strategy = std::make_unique<ReachableFileCleanup>(ctx_->table->io(),
delete_func_);
+ }
+ return strategy->CleanFiles(metadata_before_expiration,
metadata_after_expiration,
Review Comment:
Done in b57207a — PendingUpdate::Commit() now returns Finalize's status.
Added CommitPropagatesMalformedSourceSnapshotId.
##########
src/iceberg/update/expire_snapshots.cc:
##########
@@ -331,6 +332,309 @@ class ReachableFileCleanup : public FileCleanupStrategy {
}
};
+/// \brief Incremental file cleanup strategy for simple linear-ancestry
expirations.
+///
+/// Mirrors Java's IncrementalFileCleanup. Only safe when:
Review Comment:
Dropped in b57207a.
##########
src/iceberg/update/expire_snapshots.cc:
##########
@@ -331,6 +331,303 @@ class ReachableFileCleanup : public FileCleanupStrategy {
}
};
+/// \brief Incremental file cleanup strategy for simple linear-ancestry
expirations.
+///
+/// Mirrors Java's IncrementalFileCleanup. Only safe when:
+/// * No snapshot IDs were explicitly listed for expiration.
+/// * No removed snapshots lived outside the current main ancestry.
+/// * No retained snapshots live outside the current main ancestry.
+///
+/// Each manifest is attributed to its writer snapshot via added_snapshot_id,
so
+/// two snapshot passes are enough -- one over retained snapshots to learn
which
+/// manifests are still live, one over expired snapshots to learn which
manifests,
+/// manifest lists, and data files to drop. Cherry-pick protection via
+/// SnapshotSummaryFields::kSourceSnapshotId prevents removing data that was
+/// logically introduced by a snapshot whose changes are still present in the
+/// current state under a different id.
+///
+/// TODO(shangxinli): Add multi-threaded manifest reading and file deletion
support.
+class IncrementalFileCleanup : public FileCleanupStrategy {
+ public:
+ using FileCleanupStrategy::FileCleanupStrategy;
+
+ Status CleanFiles(const TableMetadata& metadata_before_expiration,
+ const TableMetadata& metadata_after_expiration,
+ const std::unordered_set<int64_t>& expired_snapshot_ids,
+ CleanupLevel level) override {
+ if (expired_snapshot_ids.empty()) {
+ return {};
+ }
+
+ std::unordered_set<int64_t> valid_ids;
+ valid_ids.reserve(metadata_after_expiration.snapshots.size());
+ for (const auto& snapshot : metadata_after_expiration.snapshots) {
+ if (snapshot) {
+ valid_ids.insert(snapshot->snapshot_id);
+ }
+ }
+
+ auto current_result = metadata_before_expiration.SnapshotById(
+ metadata_before_expiration.current_snapshot_id);
+ if (!current_result.has_value() || current_result.value() == nullptr) {
+ return {};
+ }
+
+ // Ancestors of the current table state. Files deleted in a non-ancestor
+ // snapshot may still belong to the current state (rolled-back commits),
+ // so we only physically delete files removed by ancestor snapshots.
+ auto ancestors_result = SnapshotUtil::AncestorsOf(
+ current_result.value()->snapshot_id,
[&metadata_before_expiration](int64_t id) {
+ return metadata_before_expiration.SnapshotById(id);
+ });
+ if (!ancestors_result.has_value()) {
+ return {};
+ }
+ std::unordered_set<int64_t> ancestor_ids;
+ ancestor_ids.reserve(ancestors_result.value().size());
+ for (const auto& ancestor : ancestors_result.value()) {
+ if (ancestor) ancestor_ids.insert(ancestor->snapshot_id);
+ }
+
+ // Cherry-pick protection: snapshots whose changes were picked into the
+ // current ancestry under a different snapshot id should not be cleaned up.
+ // Iterate the ancestor pointers we already have rather than re-looking-up
+ // each snapshot by id.
+ std::unordered_set<int64_t> picked_ancestor_snapshot_ids;
+ for (const auto& ancestor : ancestors_result.value()) {
+ if (!ancestor) continue;
+ const auto& summary = ancestor->summary;
+ auto it = summary.find(SnapshotSummaryFields::kSourceSnapshotId);
+ if (it == summary.end()) continue;
+ try {
+ picked_ancestor_snapshot_ids.insert(std::stoll(it->second));
+ } catch (...) {
+ // Malformed source-snapshot-id; skip rather than fail cleanup.
Review Comment:
Fixed in b57207a — both parse sites now propagate via
ICEBERG_ASSIGN_OR_RAISE.
##########
src/iceberg/update/expire_snapshots.cc:
##########
@@ -613,11 +917,23 @@ Status ExpireSnapshots::Finalize(Result<const
TableMetadata*> commit_result) {
apply_result_.reset();
// File cleanup is best-effort: log and continue on individual file deletion
failures
- ReachableFileCleanup strategy(ctx_->table->io(), delete_func_);
- return strategy.CleanFiles(metadata_before_expiration,
metadata_after_expiration,
- expired_ids, cleanup_level_);
+ // Pick incremental cleanup when the expiration is a simple linear-ancestry
walk:
+ // no explicit snapshot IDs, no removed snapshots outside main ancestry, and
no
+ // retained snapshots outside main ancestry. Mirrors Java RemoveSnapshots's
+ // dispatch in cleanExpiredSnapshots().
+ bool can_use_incremental = !specified_snapshot_id_ &&
+
!HasRemovedNonMainAncestors(metadata_before_expiration,
+
metadata_after_expiration) &&
+ !HasNonMainSnapshots(metadata_after_expiration);
+
+ std::unique_ptr<FileCleanupStrategy> strategy;
Review Comment:
Done in b57207a.
##########
src/iceberg/update/expire_snapshots.cc:
##########
@@ -331,6 +332,309 @@ class ReachableFileCleanup : public FileCleanupStrategy {
}
};
+/// \brief Incremental file cleanup strategy for simple linear-ancestry
expirations.
+///
+/// Mirrors Java's IncrementalFileCleanup. Only safe when:
+/// * No snapshot IDs were explicitly listed for expiration.
+/// * No removed snapshots lived outside the current main ancestry.
+/// * No retained snapshots live outside the current main ancestry.
+///
+/// Each manifest is attributed to its writer snapshot via added_snapshot_id,
so
+/// two snapshot passes are enough -- one over retained snapshots to learn
which
+/// manifests are still live, one over expired snapshots to learn which
manifests,
+/// manifest lists, and data files to drop. Cherry-pick protection via
+/// SnapshotSummaryFields::kSourceSnapshotId prevents removing data that was
+/// logically introduced by a snapshot whose changes are still present in the
+/// current state under a different id.
+///
+/// TODO(shangxinli): Add multi-threaded manifest reading and file deletion
support.
+class IncrementalFileCleanup : public FileCleanupStrategy {
+ public:
+ using FileCleanupStrategy::FileCleanupStrategy;
+
+ Status CleanFiles(const TableMetadata& metadata_before_expiration,
+ const TableMetadata& metadata_after_expiration,
+ const std::unordered_set<int64_t>& expired_snapshot_ids,
+ CleanupLevel level) override {
+ if (expired_snapshot_ids.empty()) {
+ return {};
+ }
+
+ std::unordered_set<int64_t> valid_ids;
+ valid_ids.reserve(metadata_after_expiration.snapshots.size());
+ for (const auto& snapshot : metadata_after_expiration.snapshots) {
+ if (snapshot) {
+ valid_ids.insert(snapshot->snapshot_id);
+ }
+ }
+
+ auto current_result = metadata_before_expiration.SnapshotById(
+ metadata_before_expiration.current_snapshot_id);
+ if (!current_result.has_value() || current_result.value() == nullptr) {
+ return {};
+ }
+
+ // Ancestors of the current table state. Files deleted in a non-ancestor
+ // snapshot may still belong to the current state (rolled-back commits),
+ // so we only physically delete files removed by ancestor snapshots.
+ auto ancestors_result = SnapshotUtil::AncestorsOf(
+ current_result.value()->snapshot_id,
[&metadata_before_expiration](int64_t id) {
+ return metadata_before_expiration.SnapshotById(id);
+ });
+ if (!ancestors_result.has_value()) {
+ return {};
+ }
+ std::unordered_set<int64_t> ancestor_ids;
+ ancestor_ids.reserve(ancestors_result.value().size());
+ for (const auto& ancestor : ancestors_result.value()) {
+ if (ancestor) ancestor_ids.insert(ancestor->snapshot_id);
+ }
+
+ // Cherry-pick protection: snapshots whose changes were picked into the
+ // current ancestry under a different snapshot id should not be cleaned up.
+ // Iterate the ancestor pointers we already have rather than re-looking-up
+ // each snapshot by id.
+ std::unordered_set<int64_t> picked_ancestor_snapshot_ids;
+ for (const auto& ancestor : ancestors_result.value()) {
+ if (!ancestor) continue;
+ const auto& summary = ancestor->summary;
+ auto it = summary.find(SnapshotSummaryFields::kSourceSnapshotId);
+ if (it == summary.end()) continue;
+ try {
+ picked_ancestor_snapshot_ids.insert(std::stoll(it->second));
Review Comment:
Done in b57207a.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]