lintingbin commented on code in PR #3273:
URL: https://github.com/apache/amoro/pull/3273#discussion_r1867434585


##########
amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java:
##########
@@ -696,26 +702,152 @@ CloseableIterable<FileEntry> fileScan(
 
   protected ExpireFiles expiredFileScan(
       DataExpirationConfig expirationConfig, Expression dataFilter, long 
expireTimestamp) {
-    Map<StructLike, DataFileFreshness> partitionFreshness = 
Maps.newConcurrentMap();
     ExpireFiles expiredFiles = new ExpireFiles();
     try (CloseableIterable<FileEntry> entries = fileScan(table, dataFilter, 
expirationConfig)) {
-      Queue<FileEntry> fileEntries = new LinkedTransferQueue<>();
-      entries.forEach(
-          e -> {
-            if (mayExpired(e, partitionFreshness, expireTimestamp)) {
-              fileEntries.add(e);
-            }
-          });
-      fileEntries
-          .parallelStream()
-          .filter(e -> willNotRetain(e, expirationConfig, partitionFreshness))
-          .forEach(expiredFiles::addFile);
+      boolean expireByPartitionSuccess = false;
+      if (expirationConfig
+          .getExpirationLevel()
+          .equals(DataExpirationConfig.ExpireLevel.PARTITION)) {
+        expireByPartitionSuccess =
+            tryExpireByPartition(entries, expirationConfig, expireTimestamp, 
expiredFiles);
+      }
+      if (!expireByPartitionSuccess) {
+        expireByMetricsUpperBound(entries, expirationConfig, expireTimestamp, 
expiredFiles);
+      }
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
     return expiredFiles;
   }
 
+  private boolean tryExpireByPartition(
+      CloseableIterable<FileEntry> entries,
+      DataExpirationConfig expirationConfig,
+      long expireTimestamp,
+      ExpireFiles expiredFiles) {
+    Types.NestedField expirationField =
+        table.schema().findField(expirationConfig.getExpirationField());
+    Map<Integer, Map<Integer, PartitionField>> expirePartitionFieldsMap =
+        buildExpirePartitionFieldsMap(expirationField);
+    // All historical specs have expirationField as the partition field.
+    boolean allSpecsMatch = 
expirePartitionFieldsMap.values().stream().noneMatch(Map::isEmpty);
+    if (allSpecsMatch) {

Review Comment:
   If the partition field has been modified and the data-expire configuration 
is set to the modified partition field, previous data might not be deleted. 
Therefore, we have implemented a strict check that requires all historical 
partition fields to meet the conditions; otherwise, it will revert to metric 
deletion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to