lintingbin commented on code in PR #3273:
URL: https://github.com/apache/amoro/pull/3273#discussion_r1872879681


##########
amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java:
##########
@@ -696,26 +700,145 @@ CloseableIterable<FileEntry> fileScan(
 
   protected ExpireFiles expiredFileScan(
       DataExpirationConfig expirationConfig, Expression dataFilter, long 
expireTimestamp) {
-    Map<StructLike, DataFileFreshness> partitionFreshness = 
Maps.newConcurrentMap();
     ExpireFiles expiredFiles = new ExpireFiles();
     try (CloseableIterable<FileEntry> entries = fileScan(table, dataFilter, 
expirationConfig)) {
-      Queue<FileEntry> fileEntries = new LinkedTransferQueue<>();
-      entries.forEach(
-          e -> {
-            if (mayExpired(e, partitionFreshness, expireTimestamp)) {
-              fileEntries.add(e);
-            }
-          });
-      fileEntries
-          .parallelStream()
-          .filter(e -> willNotRetain(e, expirationConfig, partitionFreshness))
-          .forEach(expiredFiles::addFile);
+      boolean expireByPartitionSuccess = false;
+      if (!table.specs().isEmpty()
+          && expirationConfig
+              .getExpirationLevel()
+              .equals(DataExpirationConfig.ExpireLevel.PARTITION)) {
+        expireByPartitionSuccess =
+            tryExpireByPartition(entries, expirationConfig, expireTimestamp, 
expiredFiles);
+      }
+      if (!expireByPartitionSuccess) {
+        expireByMetricsUpperBound(entries, expirationConfig, expireTimestamp, 
expiredFiles);
+      }
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
     return expiredFiles;
   }
 
+  private boolean tryExpireByPartition(
+      CloseableIterable<FileEntry> entries,
+      DataExpirationConfig expirationConfig,
+      long expireTimestamp,
+      ExpireFiles expiredFiles) {
+    Types.NestedField expirationField =
+        table.schema().findField(expirationConfig.getExpirationField());
+
+    Comparable<?> upperBound;
+    try {
+      upperBound = getExpireUpperBound(expirationConfig, expirationField, 
expireTimestamp);
+    } catch (IllegalArgumentException e) {
+      LOG.error("Failed to get partition upper bound", e);
+      return false;
+    }
+
+    // all history versions expiration partition upper bound
+    Map<Integer, Map<Integer, Comparable<?>>> allPartitionUpperBound =
+        getAllPartitionUpperBound(expirationField, upperBound);
+
+    if (allPartitionUpperBound != null) {
+      entries.forEach(
+          fileEntry -> {
+            ContentFile<?> contentFile = fileEntry.getFile();
+            int fileSpecId = contentFile.specId();
+            Map<Integer, Comparable<?>> partitionUpperBound =
+                allPartitionUpperBound.get(fileSpecId);
+            for (Map.Entry<Integer, Comparable<?>> partitionPosToValue :
+                partitionUpperBound.entrySet()) {
+              Integer partitionPos = partitionPosToValue.getKey();
+              Comparable<?> partitionUpperBoundValue = 
partitionPosToValue.getValue();
+              Comparable<Object> filePartitionValue =
+                  contentFile.partition().get(partitionPos, 
partitionUpperBoundValue.getClass());
+              if (filePartitionValue.compareTo(partitionUpperBoundValue) >= 0) 
{
+                return;
+              }
+            }
+            expiredFiles.addFile(fileEntry);
+          });
+      return true;
+    }
+    return false;
+  }
+
+  private void expireByMetricsUpperBound(
+      CloseableIterable<FileEntry> entries,
+      DataExpirationConfig expirationConfig,
+      long expireTimestamp,
+      ExpireFiles expiredFiles) {
+    Map<StructLike, DataFileFreshness> partitionFreshness = 
Maps.newConcurrentMap();
+    Queue<FileEntry> fileEntries = new LinkedTransferQueue<>();
+    entries.forEach(
+        e -> {
+          if (mayExpired(e, partitionFreshness, expireTimestamp)) {
+            fileEntries.add(e);
+          }
+        });
+    fileEntries
+        .parallelStream()
+        .filter(e -> willNotRetain(e, expirationConfig, partitionFreshness))
+        .forEach(expiredFiles::addFile);
+  }
+
+  private Map<Integer, Map<Integer, Comparable<?>>> getAllPartitionUpperBound(
+      Types.NestedField expireField, Comparable<?> upperBound) {
+    // specId -> (partitionPos -> partitionUpperBoundValue)
+    Map<Integer, Map<Integer, Comparable<?>>> allPartitionUpperBound = new 
HashMap<>();
+    for (Map.Entry<Integer, PartitionSpec> spec : table.specs().entrySet()) {
+      int pos = 0;
+      Map<Integer, Comparable<?>> partitionUpperBound = new HashMap<>();
+      for (PartitionField field : spec.getValue().fields()) {
+        if (field.sourceId() == expireField.fieldId()) {
+          if (field.transform().isVoid()) {

Review Comment:
   @XBaith Added an isVoid check, rather than adding a test case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to