amogh-jahagirdar commented on code in PR #7651:
URL: https://github.com/apache/iceberg/pull/7651#discussion_r1199232563
##########
core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesGroup.java:
##########
@@ -38,12 +38,18 @@
public class RewritePositionDeletesGroup {
private final FileGroupInfo info;
private final List<PositionDeletesScanTask> tasks;
+ private final long maxRewrittenDataSequenceNumber;
private Set<DeleteFile> addedDeleteFiles = Collections.emptySet();
public RewritePositionDeletesGroup(FileGroupInfo info,
List<PositionDeletesScanTask> tasks) {
this.info = info;
this.tasks = tasks;
+ this.maxRewrittenDataSequenceNumber =
+ tasks.stream()
+ .map(t -> t.file().dataSequenceNumber())
+ .max(Long::compare)
+ .orElseThrow(() -> new IllegalArgumentException("Empty file
group"));
Review Comment:
Nit: Could we just Preconditions.checkArgument the passed in task list?
##########
spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java:
##########
@@ -739,4 +789,43 @@ private void checkResult(
size(newDeletes),
result.rewriteResults().stream().mapToLong(FileGroupRewriteResult::addedBytesCount).sum());
}
+
+ private void checkSequenceNumbers(
+ Table table, List<DeleteFile> rewrittenDeletes, List<DeleteFile>
addedDeletes) {
+ StructLikeMap<List<DeleteFile>> rewrittenFilesPerPartition =
+ groupPerPartition(table, rewrittenDeletes);
+ StructLikeMap<List<DeleteFile>> addedFilesPerPartition =
groupPerPartition(table, addedDeletes);
+ for (StructLike partition : rewrittenFilesPerPartition.keySet()) {
+ Long maxRewrittenSeq =
+ rewrittenFilesPerPartition.get(partition).stream()
+ .map(ContentFile::dataSequenceNumber)
+ .max(Long::compare)
+ .get();
+ List<DeleteFile> addedPartitionFiles =
addedFilesPerPartition.get(partition);
+ if (addedPartitionFiles != null) {
+ addedPartitionFiles.forEach(
+ d ->
+ Assert.assertEquals(
+ "Sequence number should be max of rewritten set",
+ d.dataSequenceNumber(),
+ maxRewrittenSeq));
+ }
+ }
+ }
+
+ private StructLikeMap<List<DeleteFile>> groupPerPartition(
+ Table table, List<DeleteFile> deleteFiles) {
+ StructLikeMap<List<DeleteFile>> result =
+ StructLikeMap.create(Partitioning.partitionType(table));
+ for (DeleteFile deleteFile : deleteFiles) {
+ StructLike partition = deleteFile.partition();
+ List<DeleteFile> partitionFiles = result.get(partition);
+ if (partitionFiles == null) {
+ partitionFiles = Lists.newArrayList();
+ }
Review Comment:
Nit: newline after the if block
##########
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java:
##########
@@ -1129,4 +1145,39 @@ protected ManifestReader<DeleteFile>
newManifestReader(ManifestFile manifest) {
return MergingSnapshotProducer.this.newDeleteManifestReader(manifest);
}
}
+
+ private static class DeleteFileHolder {
+ private final DeleteFile deleteFile;
+ private final Long dataSequenceNumber;
+
+ /**
+ * Queue a delete file for commit with a given data sequence number
Review Comment:
Nit: Not sure if `queue` is the right word here, it's a bit redundant
considering the class name but could we just use the word `Holds` or `Contains`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]