amogh-jahagirdar commented on code in PR #4578:
URL: https://github.com/apache/iceberg/pull/4578#discussion_r886364809


##########
core/src/main/java/org/apache/iceberg/RemoveSnapshots.java:
##########
@@ -161,21 +176,143 @@ public List<Snapshot> apply() {
 
   private TableMetadata internalApply() {
     this.base = ops.refresh();
+    if (base.snapshots().isEmpty()) {
+      return base;
+    }
 
     Set<Long> idsToRetain = Sets.newHashSet();
-    List<Long> ancestorIds = SnapshotUtil.ancestorIds(base.currentSnapshot(), 
base::snapshot);
-    if (minNumSnapshots >= ancestorIds.size()) {
-      idsToRetain.addAll(ancestorIds);
-    } else {
-      idsToRetain.addAll(ancestorIds.subList(0, minNumSnapshots));
+
+    // Compute the snapshots for each reference
+    Map<SnapshotRef, Set<Long>> refSnapshots = 
computeRefSnapshots(base.refs().values());
+
+    // Identify unreferenced snapshots which should be retained
+    Set<Long> unreferencedSnapshotsToRetain = 
computeUnreferencedSnapshotsToRetain(refSnapshots);
+    idsToRetain.addAll(unreferencedSnapshotsToRetain);
+
+    // Identify refs that should be removed
+    Map<String, SnapshotRef> retainedRefs  = computeRetainedRefs(base.refs());
+    Map<Long, List<String>> retainedIdToRefs = Maps.newHashMap();
+    for (Map.Entry<String, SnapshotRef> retainedRefEntry : 
retainedRefs.entrySet()) {
+      long snapshotId = retainedRefEntry.getValue().snapshotId();
+      retainedIdToRefs.putIfAbsent(snapshotId, Lists.newArrayList());
+      retainedIdToRefs.get(snapshotId).add(retainedRefEntry.getKey());
+      idsToRetain.add(snapshotId);
+    }
+
+    for (long idToRemove : idsToRemove) {
+      List<String> refsForId = retainedIdToRefs.get(idToRemove);
+      Preconditions.checkArgument(refsForId == null,
+          "Cannot expire %s. Still referenced by refs: %s", idToRemove, 
refsForId);
     }
 
-    TableMetadata updateMeta = base.removeSnapshotsIf(snapshot ->
-        idsToRemove.contains(snapshot.snapshotId()) ||
-        (snapshot.timestampMillis() < expireOlderThan && 
!idsToRetain.contains(snapshot.snapshotId())));
-    List<Snapshot> updateSnapshots = updateMeta.snapshots();
-    List<Snapshot> baseSnapshots = base.snapshots();
-    return updateSnapshots.size() != baseSnapshots.size() ? updateMeta : base;
+    Set<Long> branchSnapshotsToRetain = 
computeAllBranchSnapshotsToRetain(retainedRefs.values(), refSnapshots);
+    idsToRetain.addAll(branchSnapshotsToRetain);
+    TableMetadata.Builder updatedMetaBuilder = TableMetadata.buildFrom(base);
+
+    base.snapshots().stream()
+        .map(Snapshot::snapshotId)
+        .filter(snapshot -> !idsToRetain.contains(snapshot))
+        .forEach(idsToRemove::add);
+    updatedMetaBuilder.removeSnapshots(idsToRemove);
+
+    base.refs().keySet().stream()
+        .filter(ref -> !retainedRefs.containsKey(ref))
+        .forEach(updatedMetaBuilder::removeRef);
+
+    return updatedMetaBuilder.build();
+  }
+
+  /**
+   * Helper to compute the mapping of a ref to its snapshots. If it's a 
branch, the snapshots is an ordered set
+   * of all the snapshots on the branch. If it's a tag, the snapshot is a set 
of the single snapshot the tag refers to
+   */
+  private Map<SnapshotRef, Set<Long>> 
computeRefSnapshots(Collection<SnapshotRef> refs) {

Review Comment:
   Yeah it did feel weird to use the ref as a key, I did mostly to avoid having 
to pass in the entire ref map in other helper functions and do lookups in those 
functions, but that's not something to optimize for. Updated to using the name 
instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to