klion26 commented on code in PR #3273:
URL: https://github.com/apache/amoro/pull/3273#discussion_r1867126452
##########
amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java:
##########
@@ -696,26 +702,152 @@ CloseableIterable<FileEntry> fileScan(
protected ExpireFiles expiredFileScan(
DataExpirationConfig expirationConfig, Expression dataFilter, long
expireTimestamp) {
- Map<StructLike, DataFileFreshness> partitionFreshness =
Maps.newConcurrentMap();
ExpireFiles expiredFiles = new ExpireFiles();
try (CloseableIterable<FileEntry> entries = fileScan(table, dataFilter,
expirationConfig)) {
- Queue<FileEntry> fileEntries = new LinkedTransferQueue<>();
- entries.forEach(
- e -> {
- if (mayExpired(e, partitionFreshness, expireTimestamp)) {
- fileEntries.add(e);
- }
- });
- fileEntries
- .parallelStream()
- .filter(e -> willNotRetain(e, expirationConfig, partitionFreshness))
- .forEach(expiredFiles::addFile);
+ boolean expireByPartitionSuccess = false;
+ if (expirationConfig
+ .getExpirationLevel()
+ .equals(DataExpirationConfig.ExpireLevel.PARTITION)) {
+ expireByPartitionSuccess =
+ tryExpireByPartition(entries, expirationConfig, expireTimestamp,
expiredFiles);
+ }
+ if (!expireByPartitionSuccess) {
Review Comment:
After this change,
- if the expiration level is partition, we'll first try to expire by
partition and then, if the first fails, try to expire by metric upper bound.
- if the expiration level is data, we'll try to expire by metric upper bound
Am I understand right?
##########
amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java:
##########
@@ -696,26 +702,152 @@ CloseableIterable<FileEntry> fileScan(
protected ExpireFiles expiredFileScan(
DataExpirationConfig expirationConfig, Expression dataFilter, long
expireTimestamp) {
- Map<StructLike, DataFileFreshness> partitionFreshness =
Maps.newConcurrentMap();
ExpireFiles expiredFiles = new ExpireFiles();
try (CloseableIterable<FileEntry> entries = fileScan(table, dataFilter,
expirationConfig)) {
- Queue<FileEntry> fileEntries = new LinkedTransferQueue<>();
- entries.forEach(
- e -> {
- if (mayExpired(e, partitionFreshness, expireTimestamp)) {
- fileEntries.add(e);
- }
- });
- fileEntries
- .parallelStream()
- .filter(e -> willNotRetain(e, expirationConfig, partitionFreshness))
- .forEach(expiredFiles::addFile);
+ boolean expireByPartitionSuccess = false;
+ if (expirationConfig
+ .getExpirationLevel()
+ .equals(DataExpirationConfig.ExpireLevel.PARTITION)) {
+ expireByPartitionSuccess =
+ tryExpireByPartition(entries, expirationConfig, expireTimestamp,
expiredFiles);
+ }
+ if (!expireByPartitionSuccess) {
+ expireByMetricsUpperBound(entries, expirationConfig, expireTimestamp,
expiredFiles);
+ }
} catch (IOException e) {
throw new RuntimeException(e);
}
return expiredFiles;
}
+ private boolean tryExpireByPartition(
+ CloseableIterable<FileEntry> entries,
+ DataExpirationConfig expirationConfig,
+ long expireTimestamp,
+ ExpireFiles expiredFiles) {
+ Types.NestedField expirationField =
+ table.schema().findField(expirationConfig.getExpirationField());
+ Map<Integer, Map<Integer, PartitionField>> expirePartitionFieldsMap =
+ buildExpirePartitionFieldsMap(expirationField);
+ // All historical specs have expirationField as the partition field.
+ boolean allSpecsMatch =
expirePartitionFieldsMap.values().stream().noneMatch(Map::isEmpty);
+ if (allSpecsMatch) {
Review Comment:
It seems that we just trigger the expiration when *all* historical specs
contain the partition fields. Is there any consideration for doing this?
##########
amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestDataExpire.java:
##########
@@ -194,17 +194,11 @@ private void testUnKeyedPartitionLevel() {
List<Record> expected;
if (tableTestHelper().partitionSpec().isPartitioned()) {
- if (expireByStringDate()) {
- expected =
- Lists.newArrayList(
- createRecord(2, "222", parseMillis("2022-01-03T12:00:00"),
"2022-01-03T12:00:00"));
- } else {
- expected =
- Lists.newArrayList(
- createRecord(2, "222", parseMillis("2022-01-03T12:00:00"),
"2022-01-03T12:00:00"),
- createRecord(3, "333", parseMillis("2022-01-02T12:00:00"),
"2022-01-02T12:00:00"),
- createRecord(4, "444", parseMillis("2022-01-02T19:00:00"),
"2022-01-02T19:00:00"));
- }
+ expected =
Review Comment:
It seems the table was partitioned by `op_time` with `day` transformer, and
will expire the data before 86400s, so the record created in `2022-01-02` and
`2022-01-03` will still be there(because the partition can't be expired) is
this understanding correct?
In the other way, is there any way we can know the partition for each record
easier?
##########
amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java:
##########
@@ -696,26 +702,152 @@ CloseableIterable<FileEntry> fileScan(
protected ExpireFiles expiredFileScan(
DataExpirationConfig expirationConfig, Expression dataFilter, long
expireTimestamp) {
- Map<StructLike, DataFileFreshness> partitionFreshness =
Maps.newConcurrentMap();
ExpireFiles expiredFiles = new ExpireFiles();
try (CloseableIterable<FileEntry> entries = fileScan(table, dataFilter,
expirationConfig)) {
- Queue<FileEntry> fileEntries = new LinkedTransferQueue<>();
- entries.forEach(
- e -> {
- if (mayExpired(e, partitionFreshness, expireTimestamp)) {
- fileEntries.add(e);
- }
- });
- fileEntries
- .parallelStream()
- .filter(e -> willNotRetain(e, expirationConfig, partitionFreshness))
- .forEach(expiredFiles::addFile);
+ boolean expireByPartitionSuccess = false;
+ if (expirationConfig
+ .getExpirationLevel()
+ .equals(DataExpirationConfig.ExpireLevel.PARTITION)) {
+ expireByPartitionSuccess =
+ tryExpireByPartition(entries, expirationConfig, expireTimestamp,
expiredFiles);
+ }
+ if (!expireByPartitionSuccess) {
+ expireByMetricsUpperBound(entries, expirationConfig, expireTimestamp,
expiredFiles);
+ }
} catch (IOException e) {
throw new RuntimeException(e);
}
return expiredFiles;
}
+ private boolean tryExpireByPartition(
+ CloseableIterable<FileEntry> entries,
+ DataExpirationConfig expirationConfig,
+ long expireTimestamp,
+ ExpireFiles expiredFiles) {
+ Types.NestedField expirationField =
+ table.schema().findField(expirationConfig.getExpirationField());
+ Map<Integer, Map<Integer, PartitionField>> expirePartitionFieldsMap =
+ buildExpirePartitionFieldsMap(expirationField);
+ // All historical specs have expirationField as the partition field.
+ boolean allSpecsMatch =
expirePartitionFieldsMap.values().stream().noneMatch(Map::isEmpty);
+ if (allSpecsMatch) {
+ Comparable<?> expirePartitionValue;
+ try {
+ expirePartitionValue =
+ getPartitionUpperBound(expirationConfig, expirationField,
expireTimestamp);
+ } catch (IllegalArgumentException e) {
+ LOG.error("Failed to get partition upper bound", e);
+ return false;
+ }
+
+ Map<Integer, Map<Integer, Comparable<?>>> expirePartitionValueMap =
+ getExpirePartitionValueMap(
+ expirePartitionFieldsMap, expirationField, expirePartitionValue);
+ entries.forEach(
+ fileEntry -> {
+ List<Boolean> expiredList = new ArrayList<>();
+ ContentFile<?> contentFile = fileEntry.getFile();
+ int fileSpecId = contentFile.specId();
+ for (Map.Entry<Integer, Comparable<?>> entry :
+ expirePartitionValueMap.get(fileSpecId).entrySet()) {
+ Comparable<Object> partitionValue =
+ contentFile.partition().get(entry.getKey(),
entry.getValue().getClass());
+ boolean expired = partitionValue.compareTo(entry.getValue()) < 0;
+ expiredList.add(expired);
+ }
+ if (!expiredList.isEmpty() &&
expiredList.stream().allMatch(Boolean::booleanValue)) {
+ expiredFiles.addFile(fileEntry);
+ }
+ });
+ return true;
+ }
+ return false;
+ }
+
+ private void expireByMetricsUpperBound(
+ CloseableIterable<FileEntry> entries,
+ DataExpirationConfig expirationConfig,
+ long expireTimestamp,
+ ExpireFiles expiredFiles) {
+ Map<StructLike, DataFileFreshness> partitionFreshness =
Maps.newConcurrentMap();
+ Queue<FileEntry> fileEntries = new LinkedTransferQueue<>();
+ entries.forEach(
+ e -> {
+ if (mayExpired(e, partitionFreshness, expireTimestamp)) {
+ fileEntries.add(e);
+ }
+ });
+ fileEntries
+ .parallelStream()
+ .filter(e -> willNotRetain(e, expirationConfig, partitionFreshness))
+ .forEach(expiredFiles::addFile);
+ }
+
+ private Map<Integer, Map<Integer, PartitionField>>
buildExpirePartitionFieldsMap(
Review Comment:
Do we need to add some comments to this function here, such as what is the
key/value of the return value, and the brief logic
##########
amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestDataExpire.java:
##########
@@ -194,17 +194,11 @@ private void testUnKeyedPartitionLevel() {
List<Record> expected;
if (tableTestHelper().partitionSpec().isPartitioned()) {
- if (expireByStringDate()) {
- expected =
- Lists.newArrayList(
- createRecord(2, "222", parseMillis("2022-01-03T12:00:00"),
"2022-01-03T12:00:00"));
- } else {
- expected =
- Lists.newArrayList(
- createRecord(2, "222", parseMillis("2022-01-03T12:00:00"),
"2022-01-03T12:00:00"),
- createRecord(3, "333", parseMillis("2022-01-02T12:00:00"),
"2022-01-02T12:00:00"),
- createRecord(4, "444", parseMillis("2022-01-02T19:00:00"),
"2022-01-02T19:00:00"));
- }
+ expected =
+ Lists.newArrayList(
+ createRecord(2, "222", parseMillis("2022-01-03T12:00:00"),
"2022-01-03T12:00:00"),
+ createRecord(3, "333", parseMillis("2022-01-02T12:00:00"),
"2022-01-02T12:00:00"),
+ createRecord(4, "444", parseMillis("2022-01-02T19:00:00"),
"2022-01-02T19:00:00"));
} else {
expected =
Lists.newArrayList(
Review Comment:
It seems that if the table is not partitioned, we would not expire any
records. do I understand right?
##########
amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java:
##########
@@ -696,26 +702,152 @@ CloseableIterable<FileEntry> fileScan(
protected ExpireFiles expiredFileScan(
DataExpirationConfig expirationConfig, Expression dataFilter, long
expireTimestamp) {
- Map<StructLike, DataFileFreshness> partitionFreshness =
Maps.newConcurrentMap();
ExpireFiles expiredFiles = new ExpireFiles();
try (CloseableIterable<FileEntry> entries = fileScan(table, dataFilter,
expirationConfig)) {
- Queue<FileEntry> fileEntries = new LinkedTransferQueue<>();
- entries.forEach(
- e -> {
- if (mayExpired(e, partitionFreshness, expireTimestamp)) {
- fileEntries.add(e);
- }
- });
- fileEntries
- .parallelStream()
- .filter(e -> willNotRetain(e, expirationConfig, partitionFreshness))
- .forEach(expiredFiles::addFile);
+ boolean expireByPartitionSuccess = false;
+ if (expirationConfig
+ .getExpirationLevel()
+ .equals(DataExpirationConfig.ExpireLevel.PARTITION)) {
+ expireByPartitionSuccess =
+ tryExpireByPartition(entries, expirationConfig, expireTimestamp,
expiredFiles);
+ }
+ if (!expireByPartitionSuccess) {
+ expireByMetricsUpperBound(entries, expirationConfig, expireTimestamp,
expiredFiles);
+ }
} catch (IOException e) {
throw new RuntimeException(e);
}
return expiredFiles;
}
+ private boolean tryExpireByPartition(
+ CloseableIterable<FileEntry> entries,
+ DataExpirationConfig expirationConfig,
+ long expireTimestamp,
+ ExpireFiles expiredFiles) {
+ Types.NestedField expirationField =
+ table.schema().findField(expirationConfig.getExpirationField());
+ Map<Integer, Map<Integer, PartitionField>> expirePartitionFieldsMap =
+ buildExpirePartitionFieldsMap(expirationField);
+ // All historical specs have expirationField as the partition field.
+ boolean allSpecsMatch =
expirePartitionFieldsMap.values().stream().noneMatch(Map::isEmpty);
+ if (allSpecsMatch) {
+ Comparable<?> expirePartitionValue;
+ try {
+ expirePartitionValue =
+ getPartitionUpperBound(expirationConfig, expirationField,
expireTimestamp);
+ } catch (IllegalArgumentException e) {
+ LOG.error("Failed to get partition upper bound", e);
+ return false;
+ }
+
+ Map<Integer, Map<Integer, Comparable<?>>> expirePartitionValueMap =
+ getExpirePartitionValueMap(
+ expirePartitionFieldsMap, expirationField, expirePartitionValue);
+ entries.forEach(
+ fileEntry -> {
+ List<Boolean> expiredList = new ArrayList<>();
+ ContentFile<?> contentFile = fileEntry.getFile();
+ int fileSpecId = contentFile.specId();
+ for (Map.Entry<Integer, Comparable<?>> entry :
+ expirePartitionValueMap.get(fileSpecId).entrySet()) {
+ Comparable<Object> partitionValue =
+ contentFile.partition().get(entry.getKey(),
entry.getValue().getClass());
+ boolean expired = partitionValue.compareTo(entry.getValue()) < 0;
+ expiredList.add(expired);
+ }
+ if (!expiredList.isEmpty() &&
expiredList.stream().allMatch(Boolean::booleanValue)) {
+ expiredFiles.addFile(fileEntry);
+ }
+ });
+ return true;
+ }
+ return false;
+ }
+
+ private void expireByMetricsUpperBound(
+ CloseableIterable<FileEntry> entries,
+ DataExpirationConfig expirationConfig,
+ long expireTimestamp,
+ ExpireFiles expiredFiles) {
+ Map<StructLike, DataFileFreshness> partitionFreshness =
Maps.newConcurrentMap();
+ Queue<FileEntry> fileEntries = new LinkedTransferQueue<>();
+ entries.forEach(
+ e -> {
+ if (mayExpired(e, partitionFreshness, expireTimestamp)) {
+ fileEntries.add(e);
+ }
+ });
+ fileEntries
+ .parallelStream()
+ .filter(e -> willNotRetain(e, expirationConfig, partitionFreshness))
+ .forEach(expiredFiles::addFile);
+ }
+
+ private Map<Integer, Map<Integer, PartitionField>>
buildExpirePartitionFieldsMap(
+ Types.NestedField expireField) {
+ Map<Integer, Map<Integer, PartitionField>> partitionFieldsMap = new
HashMap<>();
+ for (Map.Entry<Integer, PartitionSpec> entry : table.specs().entrySet()) {
+ int pos = 0;
+ Map<Integer, PartitionField> posToField = new HashMap<>();
+ for (PartitionField field : entry.getValue().fields()) {
+ if (field.sourceId() == expireField.fieldId()) {
+ posToField.put(pos, field);
+ }
+ pos++;
+ }
+ partitionFieldsMap.put(entry.getKey(), posToField);
+ }
+
+ return partitionFieldsMap;
+ }
+
+ private Map<Integer, Map<Integer, Comparable<?>>> getExpirePartitionValueMap(
+ Map<Integer, Map<Integer, PartitionField>> expirePartitionFieldsMap,
+ Types.NestedField field,
+ Comparable<?> expireValue) {
+ Map<Integer, Map<Integer, Comparable<?>>> expirePartitionValue = new
HashMap<>();
+ for (Map.Entry<Integer, Map<Integer, PartitionField>> entry :
+ expirePartitionFieldsMap.entrySet()) {
+ Map<Integer, Comparable<?>> posToValue = new HashMap<>();
+ for (Map.Entry<Integer, PartitionField> posToField :
entry.getValue().entrySet()) {
+ posToValue.put(
+ posToField.getKey(),
+ ((SerializableFunction<Comparable<?>, Comparable<?>>)
+ posToField.getValue().transform().bind(field.type()))
+ .apply(expireValue));
+ }
+ expirePartitionValue.put(entry.getKey(), posToValue);
+ }
+ return expirePartitionValue;
+ }
+
+ private Comparable<?> getPartitionUpperBound(
+ DataExpirationConfig expirationConfig, Types.NestedField field, long
expireTimestamp) {
+ switch (field.type().typeId()) {
Review Comment:
could you please add some comments on this? especially for the type
`TIMESAMPT` and `LONG`
##########
amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java:
##########
@@ -696,26 +702,152 @@ CloseableIterable<FileEntry> fileScan(
protected ExpireFiles expiredFileScan(
DataExpirationConfig expirationConfig, Expression dataFilter, long
expireTimestamp) {
- Map<StructLike, DataFileFreshness> partitionFreshness =
Maps.newConcurrentMap();
ExpireFiles expiredFiles = new ExpireFiles();
try (CloseableIterable<FileEntry> entries = fileScan(table, dataFilter,
expirationConfig)) {
- Queue<FileEntry> fileEntries = new LinkedTransferQueue<>();
- entries.forEach(
- e -> {
- if (mayExpired(e, partitionFreshness, expireTimestamp)) {
- fileEntries.add(e);
- }
- });
- fileEntries
- .parallelStream()
- .filter(e -> willNotRetain(e, expirationConfig, partitionFreshness))
- .forEach(expiredFiles::addFile);
+ boolean expireByPartitionSuccess = false;
+ if (expirationConfig
+ .getExpirationLevel()
+ .equals(DataExpirationConfig.ExpireLevel.PARTITION)) {
+ expireByPartitionSuccess =
+ tryExpireByPartition(entries, expirationConfig, expireTimestamp,
expiredFiles);
+ }
+ if (!expireByPartitionSuccess) {
+ expireByMetricsUpperBound(entries, expirationConfig, expireTimestamp,
expiredFiles);
+ }
} catch (IOException e) {
throw new RuntimeException(e);
}
return expiredFiles;
}
+ private boolean tryExpireByPartition(
+ CloseableIterable<FileEntry> entries,
+ DataExpirationConfig expirationConfig,
+ long expireTimestamp,
+ ExpireFiles expiredFiles) {
+ Types.NestedField expirationField =
+ table.schema().findField(expirationConfig.getExpirationField());
+ Map<Integer, Map<Integer, PartitionField>> expirePartitionFieldsMap =
+ buildExpirePartitionFieldsMap(expirationField);
+ // All historical specs have expirationField as the partition field.
+ boolean allSpecsMatch =
expirePartitionFieldsMap.values().stream().noneMatch(Map::isEmpty);
+ if (allSpecsMatch) {
+ Comparable<?> expirePartitionValue;
+ try {
+ expirePartitionValue =
+ getPartitionUpperBound(expirationConfig, expirationField,
expireTimestamp);
+ } catch (IllegalArgumentException e) {
+ LOG.error("Failed to get partition upper bound", e);
+ return false;
+ }
+
+ Map<Integer, Map<Integer, Comparable<?>>> expirePartitionValueMap =
+ getExpirePartitionValueMap(
+ expirePartitionFieldsMap, expirationField, expirePartitionValue);
+ entries.forEach(
+ fileEntry -> {
+ List<Boolean> expiredList = new ArrayList<>();
+ ContentFile<?> contentFile = fileEntry.getFile();
+ int fileSpecId = contentFile.specId();
+ for (Map.Entry<Integer, Comparable<?>> entry :
+ expirePartitionValueMap.get(fileSpecId).entrySet()) {
+ Comparable<Object> partitionValue =
+ contentFile.partition().get(entry.getKey(),
entry.getValue().getClass());
+ boolean expired = partitionValue.compareTo(entry.getValue()) < 0;
+ expiredList.add(expired);
+ }
+ if (!expiredList.isEmpty() &&
expiredList.stream().allMatch(Boolean::booleanValue)) {
Review Comment:
why do we need to add the `expiredList.stream().allMatch` check here
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]