vamsikarnika commented on code in PR #621:
URL: https://github.com/apache/incubator-xtable/pull/621#discussion_r1907480072


##########
xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergMetadataFileCleaner.java:
##########
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.iceberg;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+import lombok.extern.log4j.Log4j2;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotRef;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.StatisticsFile;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Log4j2
+public class IcebergMetadataFileCleaner extends IcebergMetadataCleanupStrategy 
{
+  private static final Logger LOG = 
LoggerFactory.getLogger(IcebergMetadataFileCleaner.class);
+
+  IcebergMetadataFileCleaner(
+      FileIO fileIO,
+      ExecutorService deleteExecutorService,
+      ExecutorService planExecutorService,
+      Consumer<String> deleteFunc) {
+    super(fileIO, deleteExecutorService, planExecutorService, deleteFunc);
+  }
+
+  @Override
+  @SuppressWarnings({"checkstyle:CyclomaticComplexity", "MethodLength"})
+  public void cleanFiles(Table table, List<Snapshot> removedSnapshots) {
+    if (table.refs().size() > 1) {
+      throw new UnsupportedOperationException(
+          "Cannot incrementally clean files for tables with more than 1 ref");
+    }
+
+    table.refresh();
+    // clean up the expired snapshots:
+    // 1. Get a list of the snapshots that were removed
+    // 2. Delete any data files that were deleted by those snapshots and are 
not in the table
+    // 3. Delete any manifests that are no longer used by current snapshots
+    // 4. Delete the manifest lists
+
+    Set<Long> validIds = Sets.newHashSet();
+    for (Snapshot snapshot : table.snapshots()) {
+      validIds.add(snapshot.snapshotId());
+    }
+
+    Set<Long> expiredIds = Sets.newHashSet();
+    for (Snapshot snapshot : removedSnapshots) {
+      long snapshotId = snapshot.snapshotId();
+      expiredIds.add(snapshotId);
+    }
+
+    if (expiredIds.isEmpty()) {
+      // if no snapshots were expired, skip cleanup
+      return;
+    }
+
+    SnapshotRef branchToCleanup = Iterables.getFirst(table.refs().values(), 
null);
+    if (branchToCleanup == null) {
+      return;
+    }
+
+    Snapshot latest = table.snapshot(branchToCleanup.snapshotId());
+    Iterable<Snapshot> snapshots = table.snapshots();
+
+    // this is the set of ancestors of the current table state. when removing 
snapshots, this must
+    // only remove files that were deleted in an ancestor of the current table 
state to avoid
+    // physically deleting files that were logically deleted in a commit that 
was rolled back.
+    Set<Long> ancestorIds =
+        Sets.newHashSet(SnapshotUtil.ancestorIds(latest, 
snapshotLookup(removedSnapshots, table)));

Review Comment:
   To lookup for the ancestors of the latest snapshot, we need list of all 
available snapshots before expiring the snapshots. Since table at this state 
already has committed changes, so we've lost the expired commits. Here we 
assume that table's current snapshots + removed snapshots gives us the snapshot 
view before commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@xtable.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to