amogh-jahagirdar commented on a change in pull request #4006:
URL: https://github.com/apache/iceberg/pull/4006#discussion_r796079474



##########
File path: core/src/main/java/org/apache/iceberg/RemoveSnapshots.java
##########
@@ -155,19 +154,88 @@ private TableMetadata internalApply() {
     this.base = ops.refresh();
 
     Set<Long> idsToRetain = Sets.newHashSet();
-    List<Long> ancestorIds = SnapshotUtil.ancestorIds(base.currentSnapshot(), 
base::snapshot);
-    if (minNumSnapshots >= ancestorIds.size()) {
-      idsToRetain.addAll(ancestorIds);
-    } else {
-      idsToRetain.addAll(ancestorIds.subList(0, minNumSnapshots));
+    Map<String, SnapshotRef> references = base.refs();
+    long currentTime = System.currentTimeMillis();
+    Map<String, SnapshotRef> referencesToRetain = base.refs()
+        .entrySet()
+        .stream()
+        .filter(refEntry -> refEntry.getKey().equals(SnapshotRef.MAIN_BRANCH) 
||
+            refEntry.getValue().maxRefAgeMs() == null ||
+            currentTime - refEntry.getValue().timestampMillis() < 
refEntry.getValue().maxRefAgeMs())
+        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+    //All snapshots should be retained
+    if (globalMinSnapshots >= base.snapshots().size()) {
+      idsToRetain.addAll(base.snapshots().stream().map(snapshot -> 
snapshot.snapshotId()).collect(Collectors.toList()));
+    }
+    else {
+      List<SnapshotRef> refs = Lists.newArrayList(references.values());
+      for (SnapshotRef ref : refs) {
+        if (ref.type().equals(SnapshotRefType.BRANCH)) {
+          Snapshot startingSnapshot = base.snapshot(ref.snapshotId());
+          Set<Long> ancestorsToRetain = 
ancestorsToRetain(startingSnapshot.snapshotId(),
+              ref.minSnapshotsToKeep(),
+              ref.maxSnapshotAgeMs(),
+              base,
+              currentTime);
+          idsToRetain.addAll(ancestorsToRetain);
+        }
+      }
+    }
+    //At this point idsToRetain includes all snapshots that are within their 
branches lifecycle AND the global expiration age
+    //but it could still be insufficient for global min snapshots.
+    //We now look in the expiration set to see what are the latest snapshots 
that should be kept
+    //ToDo: This is most likely over-engineered.
+    //There's probably a simpler way to keep track of what are snapshots that 
should be retained for the global
+    Set<Snapshot> snapshotsToExpire = base.snapshots()
+        .stream()
+        .filter(snapshotId -> !idsToRetain.contains(snapshotId))
+        .collect(Collectors.toSet());
+    if (idsToRetain.size() < globalMinSnapshots) {
+      long snapshotsRequiredForGlobal = globalMinSnapshots - 
idsToRetain.size();
+      //Keep a heap of the "snapshotsRequiredForGlobal"-latest snapshots
+      Queue<Snapshot> heap = new PriorityQueue<>(new 
SnapshotTimestampComparator());
+      for (Snapshot snapshot : snapshotsToExpire) {
+        if (heap.size() < snapshotsRequiredForGlobal || 
snapshot.timestampMillis() > heap.peek().timestampMillis()) {
+          heap.offer(snapshot);
+        }
+        if (heap.size() > snapshotsRequiredForGlobal) {
+          heap.poll();
+        }
+      }
+      //At this point the heap should have the elements we want to retain.
+      snapshotsToExpire.removeAll(heap);
     }
 
-    TableMetadata updateMeta = base.removeSnapshotsIf(snapshot ->
-        idsToRemove.contains(snapshot.snapshotId()) ||
-        (snapshot.timestampMillis() < expireOlderThan && 
!idsToRetain.contains(snapshot.snapshotId())));
-    List<Snapshot> updateSnapshots = updateMeta.snapshots();
+    TableMetadata updatedMetadata = base.removeSnapshotsIf(snapshot -> 
snapshotsToExpire.contains(snapshot.snapshotId()));
+    updatedMetadata = updatedMetadata.replaceRefs(referencesToRetain);
+    List<Snapshot> updateSnapshots = updatedMetadata.snapshots();
     List<Snapshot> baseSnapshots = base.snapshots();
-    return updateSnapshots.size() != baseSnapshots.size() ? updateMeta : base;
+    return updateSnapshots.size() != baseSnapshots.size() ? updatedMetadata : 
base;
+  }
+
+
+  private Set<Long> ancestorsToRetain(long startingSnapshotId,
+                                      Integer minSnapshotsToKeep,
+                                      Long maxSnapshotAge,
+                                      TableMetadata tableMetadata,
+                                      long currentTime) {
+    Snapshot startingSnapshot = tableMetadata.snapshot(startingSnapshotId);
+    List<Long> ancestors = SnapshotUtil.ancestorIds(startingSnapshot, 
base::snapshot);
+    int ancestorIdx = 0;
+    Set<Long> ancestorsToRetain = Sets.newHashSet();
+    while (ancestorIdx < ancestors.size()) {
+      Snapshot ancestor = tableMetadata.snapshot(ancestors.get(ancestorIdx));
+      long comparisonAge = maxSnapshotAge == null ? globalExpireOlderThan : 
Math.max(maxSnapshotAge, globalExpireOlderThan);

Review comment:
       This depends on what we want the global expireOlderThan to be used for 
(if at all). Currently as implemented, the branch does not really "override" 
the global policy if the global policy has only allows for older snapshots to 
be expired. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to