XBaith commented on code in PR #3273:
URL: https://github.com/apache/amoro/pull/3273#discussion_r1871314374


##########
amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java:
##########
@@ -696,26 +702,155 @@ CloseableIterable<FileEntry> fileScan(
 
   protected ExpireFiles expiredFileScan(
       DataExpirationConfig expirationConfig, Expression dataFilter, long 
expireTimestamp) {
-    Map<StructLike, DataFileFreshness> partitionFreshness = 
Maps.newConcurrentMap();
     ExpireFiles expiredFiles = new ExpireFiles();
     try (CloseableIterable<FileEntry> entries = fileScan(table, dataFilter, 
expirationConfig)) {
-      Queue<FileEntry> fileEntries = new LinkedTransferQueue<>();
-      entries.forEach(
-          e -> {
-            if (mayExpired(e, partitionFreshness, expireTimestamp)) {
-              fileEntries.add(e);
-            }
-          });
-      fileEntries
-          .parallelStream()
-          .filter(e -> willNotRetain(e, expirationConfig, partitionFreshness))
-          .forEach(expiredFiles::addFile);
+      boolean expireByPartitionSuccess = false;
+      if (!table.specs().isEmpty()
+          && expirationConfig
+              .getExpirationLevel()
+              .equals(DataExpirationConfig.ExpireLevel.PARTITION)) {
+        expireByPartitionSuccess =
+            tryExpireByPartition(entries, expirationConfig, expireTimestamp, 
expiredFiles);
+      }
+      if (!expireByPartitionSuccess) {
+        expireByMetricsUpperBound(entries, expirationConfig, expireTimestamp, 
expiredFiles);
+      }
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
     return expiredFiles;
   }
 
+  private boolean tryExpireByPartition(
+      CloseableIterable<FileEntry> entries,
+      DataExpirationConfig expirationConfig,
+      long expireTimestamp,
+      ExpireFiles expiredFiles) {
+    Types.NestedField expirationField =
+        table.schema().findField(expirationConfig.getExpirationField());
+    Map<Integer, Map<Integer, PartitionField>> expirePartitionFieldsMap =
+        buildExpirePartitionFieldsMap(expirationField);
+    // All historical specs have expirationField as the partition field.
+    boolean allSpecsMatch = 
expirePartitionFieldsMap.values().stream().noneMatch(Map::isEmpty);
+    if (allSpecsMatch) {
+      Comparable<?> expirePartitionValue;
+      try {
+        expirePartitionValue =
+            getPartitionUpperBound(expirationConfig, expirationField, 
expireTimestamp);
+      } catch (IllegalArgumentException e) {
+        LOG.error("Failed to get partition upper bound", e);
+        return false;
+      }
+
+      Map<Integer, Map<Integer, Comparable<?>>> expirePartitionValueMap =
+          getExpirePartitionValueMap(
+              expirePartitionFieldsMap, expirationField, expirePartitionValue);
+      entries.forEach(
+          fileEntry -> {
+            List<Boolean> expiredList = new ArrayList<>();
+            ContentFile<?> contentFile = fileEntry.getFile();
+            int fileSpecId = contentFile.specId();
+            for (Map.Entry<Integer, Comparable<?>> entry :

Review Comment:
   Could you rename this variable to something more readable? like `posToValue`



##########
amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java:
##########
@@ -696,26 +702,155 @@ CloseableIterable<FileEntry> fileScan(
 
   protected ExpireFiles expiredFileScan(
       DataExpirationConfig expirationConfig, Expression dataFilter, long 
expireTimestamp) {
-    Map<StructLike, DataFileFreshness> partitionFreshness = 
Maps.newConcurrentMap();
     ExpireFiles expiredFiles = new ExpireFiles();
     try (CloseableIterable<FileEntry> entries = fileScan(table, dataFilter, 
expirationConfig)) {
-      Queue<FileEntry> fileEntries = new LinkedTransferQueue<>();
-      entries.forEach(
-          e -> {
-            if (mayExpired(e, partitionFreshness, expireTimestamp)) {
-              fileEntries.add(e);
-            }
-          });
-      fileEntries
-          .parallelStream()
-          .filter(e -> willNotRetain(e, expirationConfig, partitionFreshness))
-          .forEach(expiredFiles::addFile);
+      boolean expireByPartitionSuccess = false;
+      if (!table.specs().isEmpty()
+          && expirationConfig
+              .getExpirationLevel()
+              .equals(DataExpirationConfig.ExpireLevel.PARTITION)) {
+        expireByPartitionSuccess =
+            tryExpireByPartition(entries, expirationConfig, expireTimestamp, 
expiredFiles);
+      }
+      if (!expireByPartitionSuccess) {
+        expireByMetricsUpperBound(entries, expirationConfig, expireTimestamp, 
expiredFiles);
+      }
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
     return expiredFiles;
   }
 
+  private boolean tryExpireByPartition(
+      CloseableIterable<FileEntry> entries,
+      DataExpirationConfig expirationConfig,
+      long expireTimestamp,
+      ExpireFiles expiredFiles) {
+    Types.NestedField expirationField =
+        table.schema().findField(expirationConfig.getExpirationField());
+    Map<Integer, Map<Integer, PartitionField>> expirePartitionFieldsMap =
+        buildExpirePartitionFieldsMap(expirationField);
+    // All historical specs have expirationField as the partition field.
+    boolean allSpecsMatch = 
expirePartitionFieldsMap.values().stream().noneMatch(Map::isEmpty);
+    if (allSpecsMatch) {
+      Comparable<?> expirePartitionValue;
+      try {
+        expirePartitionValue =
+            getPartitionUpperBound(expirationConfig, expirationField, 
expireTimestamp);
+      } catch (IllegalArgumentException e) {
+        LOG.error("Failed to get partition upper bound", e);
+        return false;
+      }
+
+      Map<Integer, Map<Integer, Comparable<?>>> expirePartitionValueMap =
+          getExpirePartitionValueMap(
+              expirePartitionFieldsMap, expirationField, expirePartitionValue);
+      entries.forEach(
+          fileEntry -> {
+            List<Boolean> expiredList = new ArrayList<>();
+            ContentFile<?> contentFile = fileEntry.getFile();
+            int fileSpecId = contentFile.specId();
+            for (Map.Entry<Integer, Comparable<?>> entry :
+                expirePartitionValueMap.get(fileSpecId).entrySet()) {
+              Comparable<Object> partitionValue =
+                  contentFile.partition().get(entry.getKey(), 
entry.getValue().getClass());
+              boolean expired = partitionValue.compareTo(entry.getValue()) < 0;
+              expiredList.add(expired);
+            }
+            if (!expiredList.isEmpty() && 
expiredList.stream().allMatch(Boolean::booleanValue)) {
+              expiredFiles.addFile(fileEntry);
+            }
+          });
+      return true;
+    }
+    return false;
+  }
+
+  private void expireByMetricsUpperBound(
+      CloseableIterable<FileEntry> entries,
+      DataExpirationConfig expirationConfig,
+      long expireTimestamp,
+      ExpireFiles expiredFiles) {
+    Map<StructLike, DataFileFreshness> partitionFreshness = 
Maps.newConcurrentMap();
+    Queue<FileEntry> fileEntries = new LinkedTransferQueue<>();
+    entries.forEach(
+        e -> {
+          if (mayExpired(e, partitionFreshness, expireTimestamp)) {
+            fileEntries.add(e);
+          }
+        });
+    fileEntries
+        .parallelStream()
+        .filter(e -> willNotRetain(e, expirationConfig, partitionFreshness))
+        .forEach(expiredFiles::addFile);
+  }
+
+  private Map<Integer, Map<Integer, PartitionField>> 
buildExpirePartitionFieldsMap(
+      Types.NestedField expireField) {
+    // specId -> (partitionPos -> partitionField)
+    Map<Integer, Map<Integer, PartitionField>> partitionFieldsMap = new 
HashMap<>();
+    for (Map.Entry<Integer, PartitionSpec> entry : table.specs().entrySet()) {
+      int pos = 0;
+      Map<Integer, PartitionField> posToField = new HashMap<>();
+      for (PartitionField field : entry.getValue().fields()) {
+        if (field.sourceId() == expireField.fieldId()) {
+          posToField.put(pos, field);
+        }
+        pos++;
+      }
+      partitionFieldsMap.put(entry.getKey(), posToField);
+    }
+
+    return partitionFieldsMap;
+  }
+
+  private Map<Integer, Map<Integer, Comparable<?>>> getExpirePartitionValueMap(
+      Map<Integer, Map<Integer, PartitionField>> expirePartitionFieldsMap,
+      Types.NestedField field,
+      Comparable<?> expireValue) {
+    Map<Integer, Map<Integer, Comparable<?>>> expirePartitionValue = new 
HashMap<>();
+    for (Map.Entry<Integer, Map<Integer, PartitionField>> entry :

Review Comment:
   Need to be more readable



##########
amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java:
##########
@@ -696,26 +702,155 @@ CloseableIterable<FileEntry> fileScan(
 
   protected ExpireFiles expiredFileScan(
       DataExpirationConfig expirationConfig, Expression dataFilter, long 
expireTimestamp) {
-    Map<StructLike, DataFileFreshness> partitionFreshness = 
Maps.newConcurrentMap();
     ExpireFiles expiredFiles = new ExpireFiles();
     try (CloseableIterable<FileEntry> entries = fileScan(table, dataFilter, 
expirationConfig)) {
-      Queue<FileEntry> fileEntries = new LinkedTransferQueue<>();
-      entries.forEach(
-          e -> {
-            if (mayExpired(e, partitionFreshness, expireTimestamp)) {
-              fileEntries.add(e);
-            }
-          });
-      fileEntries
-          .parallelStream()
-          .filter(e -> willNotRetain(e, expirationConfig, partitionFreshness))
-          .forEach(expiredFiles::addFile);
+      boolean expireByPartitionSuccess = false;
+      if (!table.specs().isEmpty()
+          && expirationConfig
+              .getExpirationLevel()
+              .equals(DataExpirationConfig.ExpireLevel.PARTITION)) {
+        expireByPartitionSuccess =
+            tryExpireByPartition(entries, expirationConfig, expireTimestamp, 
expiredFiles);
+      }
+      if (!expireByPartitionSuccess) {
+        expireByMetricsUpperBound(entries, expirationConfig, expireTimestamp, 
expiredFiles);
+      }
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
     return expiredFiles;
   }
 
+  private boolean tryExpireByPartition(
+      CloseableIterable<FileEntry> entries,
+      DataExpirationConfig expirationConfig,
+      long expireTimestamp,
+      ExpireFiles expiredFiles) {
+    Types.NestedField expirationField =
+        table.schema().findField(expirationConfig.getExpirationField());
+    Map<Integer, Map<Integer, PartitionField>> expirePartitionFieldsMap =
+        buildExpirePartitionFieldsMap(expirationField);
+    // All historical specs have expirationField as the partition field.
+    boolean allSpecsMatch = 
expirePartitionFieldsMap.values().stream().noneMatch(Map::isEmpty);
+    if (allSpecsMatch) {
+      Comparable<?> expirePartitionValue;
+      try {
+        expirePartitionValue =
+            getPartitionUpperBound(expirationConfig, expirationField, 
expireTimestamp);
+      } catch (IllegalArgumentException e) {
+        LOG.error("Failed to get partition upper bound", e);
+        return false;
+      }
+
+      Map<Integer, Map<Integer, Comparable<?>>> expirePartitionValueMap =
+          getExpirePartitionValueMap(
+              expirePartitionFieldsMap, expirationField, expirePartitionValue);
+      entries.forEach(
+          fileEntry -> {
+            List<Boolean> expiredList = new ArrayList<>();
+            ContentFile<?> contentFile = fileEntry.getFile();
+            int fileSpecId = contentFile.specId();
+            for (Map.Entry<Integer, Comparable<?>> entry :
+                expirePartitionValueMap.get(fileSpecId).entrySet()) {
+              Comparable<Object> partitionValue =
+                  contentFile.partition().get(entry.getKey(), 
entry.getValue().getClass());
+              boolean expired = partitionValue.compareTo(entry.getValue()) < 0;
+              expiredList.add(expired);
+            }
+            if (!expiredList.isEmpty() && 
expiredList.stream().allMatch(Boolean::booleanValue)) {
+              expiredFiles.addFile(fileEntry);
+            }
+          });
+      return true;
+    }
+    return false;
+  }
+
+  private void expireByMetricsUpperBound(
+      CloseableIterable<FileEntry> entries,
+      DataExpirationConfig expirationConfig,
+      long expireTimestamp,
+      ExpireFiles expiredFiles) {
+    Map<StructLike, DataFileFreshness> partitionFreshness = 
Maps.newConcurrentMap();
+    Queue<FileEntry> fileEntries = new LinkedTransferQueue<>();
+    entries.forEach(
+        e -> {
+          if (mayExpired(e, partitionFreshness, expireTimestamp)) {
+            fileEntries.add(e);
+          }
+        });
+    fileEntries
+        .parallelStream()
+        .filter(e -> willNotRetain(e, expirationConfig, partitionFreshness))
+        .forEach(expiredFiles::addFile);
+  }
+
+  private Map<Integer, Map<Integer, PartitionField>> 
buildExpirePartitionFieldsMap(
+      Types.NestedField expireField) {
+    // specId -> (partitionPos -> partitionField)
+    Map<Integer, Map<Integer, PartitionField>> partitionFieldsMap = new 
HashMap<>();
+    for (Map.Entry<Integer, PartitionSpec> entry : table.specs().entrySet()) {
+      int pos = 0;
+      Map<Integer, PartitionField> posToField = new HashMap<>();
+      for (PartitionField field : entry.getValue().fields()) {
+        if (field.sourceId() == expireField.fieldId()) {
+          posToField.put(pos, field);
+        }
+        pos++;
+      }
+      partitionFieldsMap.put(entry.getKey(), posToField);
+    }
+
+    return partitionFieldsMap;
+  }
+
+  private Map<Integer, Map<Integer, Comparable<?>>> getExpirePartitionValueMap(
+      Map<Integer, Map<Integer, PartitionField>> expirePartitionFieldsMap,
+      Types.NestedField field,
+      Comparable<?> expireValue) {
+    Map<Integer, Map<Integer, Comparable<?>>> expirePartitionValue = new 
HashMap<>();
+    for (Map.Entry<Integer, Map<Integer, PartitionField>> entry :
+        expirePartitionFieldsMap.entrySet()) {
+      Map<Integer, Comparable<?>> posToValue = new HashMap<>();
+      for (Map.Entry<Integer, PartitionField> posToField : 
entry.getValue().entrySet()) {
+        posToValue.put(
+            posToField.getKey(),
+            ((SerializableFunction<Comparable<?>, Comparable<?>>)
+                    posToField.getValue().transform().bind(field.type()))
+                .apply(expireValue));

Review Comment:
   What will happen if the partition value type is `Void`, it is comparable?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to