lintingbin commented on code in PR #3273:
URL: https://github.com/apache/amoro/pull/3273#discussion_r1867438981
##########
amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java:
##########
@@ -696,26 +702,152 @@ CloseableIterable<FileEntry> fileScan(
protected ExpireFiles expiredFileScan(
DataExpirationConfig expirationConfig, Expression dataFilter, long
expireTimestamp) {
- Map<StructLike, DataFileFreshness> partitionFreshness =
Maps.newConcurrentMap();
ExpireFiles expiredFiles = new ExpireFiles();
try (CloseableIterable<FileEntry> entries = fileScan(table, dataFilter,
expirationConfig)) {
- Queue<FileEntry> fileEntries = new LinkedTransferQueue<>();
- entries.forEach(
- e -> {
- if (mayExpired(e, partitionFreshness, expireTimestamp)) {
- fileEntries.add(e);
- }
- });
- fileEntries
- .parallelStream()
- .filter(e -> willNotRetain(e, expirationConfig, partitionFreshness))
- .forEach(expiredFiles::addFile);
+ boolean expireByPartitionSuccess = false;
+ if (expirationConfig
+ .getExpirationLevel()
+ .equals(DataExpirationConfig.ExpireLevel.PARTITION)) {
+ expireByPartitionSuccess =
+ tryExpireByPartition(entries, expirationConfig, expireTimestamp,
expiredFiles);
+ }
+ if (!expireByPartitionSuccess) {
+ expireByMetricsUpperBound(entries, expirationConfig, expireTimestamp,
expiredFiles);
+ }
} catch (IOException e) {
throw new RuntimeException(e);
}
return expiredFiles;
}
+ private boolean tryExpireByPartition(
+ CloseableIterable<FileEntry> entries,
+ DataExpirationConfig expirationConfig,
+ long expireTimestamp,
+ ExpireFiles expiredFiles) {
+ Types.NestedField expirationField =
+ table.schema().findField(expirationConfig.getExpirationField());
+ Map<Integer, Map<Integer, PartitionField>> expirePartitionFieldsMap =
+ buildExpirePartitionFieldsMap(expirationField);
+ // All historical specs have expirationField as the partition field.
+ boolean allSpecsMatch =
expirePartitionFieldsMap.values().stream().noneMatch(Map::isEmpty);
+ if (allSpecsMatch) {
+ Comparable<?> expirePartitionValue;
+ try {
+ expirePartitionValue =
+ getPartitionUpperBound(expirationConfig, expirationField,
expireTimestamp);
+ } catch (IllegalArgumentException e) {
+ LOG.error("Failed to get partition upper bound", e);
+ return false;
+ }
+
+ Map<Integer, Map<Integer, Comparable<?>>> expirePartitionValueMap =
+ getExpirePartitionValueMap(
+ expirePartitionFieldsMap, expirationField, expirePartitionValue);
+ entries.forEach(
+ fileEntry -> {
+ List<Boolean> expiredList = new ArrayList<>();
+ ContentFile<?> contentFile = fileEntry.getFile();
+ int fileSpecId = contentFile.specId();
+ for (Map.Entry<Integer, Comparable<?>> entry :
+ expirePartitionValueMap.get(fileSpecId).entrySet()) {
+ Comparable<Object> partitionValue =
+ contentFile.partition().get(entry.getKey(),
entry.getValue().getClass());
+ boolean expired = partitionValue.compareTo(entry.getValue()) < 0;
+ expiredList.add(expired);
+ }
+ if (!expiredList.isEmpty() &&
expiredList.stream().allMatch(Boolean::booleanValue)) {
Review Comment:
It's possible that a timestamp field with event_time is configured with
multiple partition fields, such as year(event_time), month(event_time), and
day(event_time). All these conditions need to be met together to delete a
partition.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]