amogh-jahagirdar commented on code in PR #15006:
URL: https://github.com/apache/iceberg/pull/15006#discussion_r2868175988
##########
core/src/test/java/org/apache/iceberg/TestRowDelta.java:
##########
@@ -1882,6 +1891,266 @@ public void testConcurrentDVsForSameDataFile() {
.hasMessageContaining("Found concurrently added DV for %s",
dataFile.location());
}
+ @TestTemplate
+ public void testDuplicateDVsAreMerged() throws IOException {
+ assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
+
+ DataFile dataFile = newDataFile("data_bucket=0");
+ commit(table, table.newRowDelta().addRows(dataFile), branch);
+
+ OutputFileFactory fileFactory =
+ OutputFileFactory.builderFor(table, 1,
1).format(FileFormat.PUFFIN).build();
+
+ DeleteFile deleteFile1 = dvWithPositions(dataFile, fileFactory, 0, 2);
+ DeleteFile deleteFile2 = dvWithPositions(dataFile, fileFactory, 2, 4);
+ DeleteFile deleteFile3 = dvWithPositions(dataFile, fileFactory, 4, 8);
+ RowDelta rowDelta1 =
+
table.newRowDelta().addDeletes(deleteFile1).addDeletes(deleteFile2).addDeletes(deleteFile3);
+
+ commit(table, rowDelta1, branch);
+
+ Iterable<DeleteFile> addedDeleteFiles =
+ latestSnapshot(table, branch).addedDeleteFiles(table.io());
+ assertThat(Iterables.size(addedDeleteFiles)).isEqualTo(1);
+ DeleteFile mergedDV = Iterables.getOnlyElement(addedDeleteFiles);
+
+ assertDVHasDeletedPositions(mergedDV, LongStream.range(0,
8).boxed()::iterator);
+ }
+
+ @TestTemplate
+ public void testDuplicateDVsMergedMultipleSpecs() throws IOException {
+ assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
+
+ // append a partitioned data file
+ DataFile firstSnapshotDataFile = newDataFile("data_bucket=0");
+ commit(table, table.newAppend().appendFile(firstSnapshotDataFile), branch);
+
+ // remove the only partition field to make the spec unpartitioned
+ table.updateSpec().removeField(Expressions.bucket("data", 16)).commit();
+
+ // append an unpartitioned data file
+ DataFile secondSnapshotDataFile = newDataFile("");
+ commit(table, table.newAppend().appendFile(secondSnapshotDataFile),
branch);
+
+ // evolve the spec and add a new partition field
+ table.updateSpec().addField("data").commit();
+
+ // append a data file with the new spec
+ DataFile thirdSnapshotDataFile = newDataFile("data=abc");
+ commit(table, table.newAppend().appendFile(thirdSnapshotDataFile), branch);
+
+ assertThat(table.specs()).hasSize(3);
+
+ OutputFileFactory fileFactory =
+ OutputFileFactory.builderFor(table, 1,
1).format(FileFormat.PUFFIN).build();
+
+ DataFile dataFile = newDataFile("data=xyz");
+ // For each data file, create two DVs covering positions [0,2) and [2,4)
+ DeleteFile deleteFile1a = dvWithPositions(firstSnapshotDataFile,
fileFactory, 0, 2);
+ DeleteFile deleteFile1b = dvWithPositions(firstSnapshotDataFile,
fileFactory, 2, 4);
+ DeleteFile deleteFile2a = dvWithPositions(secondSnapshotDataFile,
fileFactory, 0, 2);
+ DeleteFile deleteFile2b = dvWithPositions(secondSnapshotDataFile,
fileFactory, 2, 4);
+ DeleteFile deleteFile3a = dvWithPositions(thirdSnapshotDataFile,
fileFactory, 0, 2);
+ DeleteFile deleteFile3b = dvWithPositions(thirdSnapshotDataFile,
fileFactory, 2, 4);
+
+ commit(
+ table,
+ table
+ .newRowDelta()
+ .addRows(dataFile)
+ .addDeletes(deleteFile1a)
+ .addDeletes(deleteFile1b)
+ .addDeletes(deleteFile2a)
+ .addDeletes(deleteFile2b)
+ .addDeletes(deleteFile3a)
+ .addDeletes(deleteFile3b),
+ branch);
+
+ Snapshot snapshot = latestSnapshot(table, branch);
+ // Expect 3 merged DVs, one per data file
+ Iterable<DeleteFile> addedDeleteFiles =
snapshot.addedDeleteFiles(table.io());
+ List<DeleteFile> mergedDVs = Lists.newArrayList(addedDeleteFiles);
+ assertThat(mergedDVs).hasSize(3);
+ // Should be a Puffin produced per merged DV spec
+
assertThat(mergedDVs.stream().map(ContentFile::location).collect(Collectors.toSet()))
+ .hasSize(1);
+
+ DeleteFile committedDVForDataFile1 =
+ Iterables.getOnlyElement(
+ mergedDVs.stream()
+ .filter(
+ dv -> Objects.equals(dv.referencedDataFile(),
firstSnapshotDataFile.location()))
+ .collect(Collectors.toList()));
+ assertDVHasDeletedPositions(committedDVForDataFile1, LongStream.range(0,
4).boxed()::iterator);
+
+ DeleteFile committedDVForDataFile2 =
+ Iterables.getOnlyElement(
+ mergedDVs.stream()
+ .filter(
+ dv ->
+ Objects.equals(dv.referencedDataFile(),
secondSnapshotDataFile.location()))
+ .collect(Collectors.toList()));
+ assertDVHasDeletedPositions(committedDVForDataFile2, LongStream.range(0,
4).boxed()::iterator);
+
+ DeleteFile committedDVForDataFile3 =
+ Iterables.getOnlyElement(
+ mergedDVs.stream()
+ .filter(
+ dv -> Objects.equals(dv.referencedDataFile(),
thirdSnapshotDataFile.location()))
+ .collect(Collectors.toList()));
+ assertDVHasDeletedPositions(committedDVForDataFile3, LongStream.range(0,
4).boxed()::iterator);
+ }
+
+ @TestTemplate
+ public void testDuplicateDVsAreMergedForMultipleReferenceFiles() throws
IOException {
+ assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
+
+ DataFile dataFile1 = newDataFile("data_bucket=0");
+ DataFile dataFile2 = newDataFile("data_bucket=0");
+ commit(table, table.newRowDelta().addRows(dataFile1).addRows(dataFile2),
branch);
+
+ OutputFileFactory fileFactory =
+ OutputFileFactory.builderFor(table, 1,
1).format(FileFormat.PUFFIN).build();
+
+ // For each data file, create two DVs covering positions [0,2) and [2,4)
+ DeleteFile deleteFile1a = dvWithPositions(dataFile1, fileFactory, 0, 2);
+ DeleteFile deleteFile1b = dvWithPositions(dataFile1, fileFactory, 2, 4);
+ DeleteFile deleteFile2a = dvWithPositions(dataFile2, fileFactory, 0, 2);
+ DeleteFile deleteFile2b = dvWithPositions(dataFile2, fileFactory, 2, 4);
+
+ // Commit all four duplicate DVs
+ RowDelta rowDelta =
+ table
+ .newRowDelta()
+ .addDeletes(deleteFile1a)
+ .addDeletes(deleteFile1b)
+ .addDeletes(deleteFile2a)
+ .addDeletes(deleteFile2b);
+
+ commit(table, rowDelta, branch);
+
+ // Expect two merged DVs, one per data file
+ Iterable<DeleteFile> addedDeleteFiles =
+ latestSnapshot(table, branch).addedDeleteFiles(table.io());
+ List<DeleteFile> mergedDVs = Lists.newArrayList(addedDeleteFiles);
+
+ assertThat(mergedDVs).hasSize(2);
+ // Should be a single Puffin produced
+
assertThat(mergedDVs.stream().map(ContentFile::location).collect(Collectors.toSet()))
+ .hasSize(1);
+
+ DeleteFile committedDVForDataFile1 =
+ Iterables.getOnlyElement(
+ mergedDVs.stream()
+ .filter(dv -> Objects.equals(dv.referencedDataFile(),
dataFile1.location()))
+ .collect(Collectors.toList()));
+ assertDVHasDeletedPositions(committedDVForDataFile1, LongStream.range(0,
4).boxed()::iterator);
+
+ DeleteFile committedDVForDataFile2 =
+ Iterables.getOnlyElement(
+ mergedDVs.stream()
+ .filter(dv -> Objects.equals(dv.referencedDataFile(),
dataFile2.location()))
+ .collect(Collectors.toList()));
+ assertDVHasDeletedPositions(committedDVForDataFile2, LongStream.range(0,
4).boxed()::iterator);
+ }
+
+ @TestTemplate
+ public void testDuplicateDVsAndValidDV() throws IOException {
+ assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
+
+ DataFile dataFile1 = newDataFile("data_bucket=0");
+ DataFile dataFile2 = newDataFile("data_bucket=0");
+ commit(table, table.newRowDelta().addRows(dataFile1).addRows(dataFile2),
branch);
+
+ OutputFileFactory fileFactory =
+ OutputFileFactory.builderFor(table, 1,
1).format(FileFormat.PUFFIN).build();
+
+ // dataFile1 has duplicate DVs that need merging
+ DeleteFile deleteFile1a = dvWithPositions(dataFile1, fileFactory, 0, 2);
+ DeleteFile deleteFile1b = dvWithPositions(dataFile1, fileFactory, 2, 4);
+
+ // dataFile2 has a valid DV
+ DeleteFile deleteFile2 = dvWithPositions(dataFile2, fileFactory, 0, 3);
+
+ RowDelta rowDelta =
+ table
+ .newRowDelta()
+ .addDeletes(deleteFile1a)
+ .addDeletes(deleteFile1b)
+ .addDeletes(deleteFile2);
+
+ commit(table, rowDelta, branch);
+
+ // Expect two DVs: one merged for dataFile1 and deleteFile2
+ Iterable<DeleteFile> addedDeleteFiles =
+ latestSnapshot(table, branch).addedDeleteFiles(table.io());
+ List<DeleteFile> committedDVs = Lists.newArrayList(addedDeleteFiles);
+
+ assertThat(committedDVs).hasSize(2);
+
+ // Verify merged DV for dataFile1 has positions [0,4)
+ DeleteFile committedDVForDataFile1 =
+ Iterables.getOnlyElement(
+ committedDVs.stream()
+ .filter(dv -> Objects.equals(dv.referencedDataFile(),
dataFile1.location()))
+ .collect(Collectors.toList()));
+ assertDVHasDeletedPositions(committedDVForDataFile1, LongStream.range(0,
4).boxed()::iterator);
+
+ // Verify deleteFile2 state
+ DeleteFile committedDVForDataFile2 =
+ Iterables.getOnlyElement(
+ committedDVs.stream()
+ .filter(dv -> Objects.equals(dv.referencedDataFile(),
dataFile2.location()))
+ .collect(Collectors.toList()));
+ assertDVHasDeletedPositions(committedDVForDataFile2, LongStream.range(0,
3).boxed()::iterator);
+ }
+
+ @TestTemplate
+ public void testDuplicateDVsAreMergedAndEqDelete() throws IOException {
+ assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
+
+ DataFile dataFile = newDataFile("data_bucket=0");
+ commit(table, table.newRowDelta().addRows(dataFile), branch);
+
+ OutputFileFactory fileFactory =
+ OutputFileFactory.builderFor(table, 1,
1).format(FileFormat.PUFFIN).build();
+
+ // Two DVs for the same data file: [0,2) and [2,4) => 4 deleted positions
total
+ DeleteFile dv1 = dvWithPositions(dataFile, fileFactory, 0, 2);
+ DeleteFile dv2 = dvWithPositions(dataFile, fileFactory, 2, 4);
+
+ // One equality delete file for the same partition
+ DeleteFile eqDelete =
+ newEqualityDeleteFile(
+ table.spec().specId(),
+ "data_bucket=0",
+ table.schema().asStruct().fields().get(0).fieldId());
+
+ RowDelta rowDelta =
table.newRowDelta().addDeletes(eqDelete).addDeletes(dv1).addDeletes(dv2);
+
+ commit(table, rowDelta, branch);
+
+ Iterable<DeleteFile> addedDeleteFiles =
+ latestSnapshot(table, branch).addedDeleteFiles(table.io());
+ List<DeleteFile> committedDeletes = Lists.newArrayList(addedDeleteFiles);
+
+ // 1 DV + 1 equality delete
+ assertThat(committedDeletes).hasSize(2);
+
+ DeleteFile committedDV =
+ Iterables.getOnlyElement(
+
committedDeletes.stream().filter(ContentFileUtil::isDV).collect(Collectors.toList()));
+ assertDVHasDeletedPositions(committedDV, LongStream.range(0,
4).boxed()::iterator);
+
+ DeleteFile committedEqDelete =
+ Iterables.getOnlyElement(
+ committedDeletes.stream()
+ .filter(df -> df.content() == FileContent.EQUALITY_DELETES)
+ .collect(Collectors.toList()));
+ assertThat(committedEqDelete).isNotNull();
+
assertThat(committedEqDelete.content()).isEqualTo(FileContent.EQUALITY_DELETES);
+ }
Review Comment:
Good call, I added tests that exercise these validations
##########
core/src/test/java/org/apache/iceberg/TestRowDelta.java:
##########
@@ -1882,6 +1891,266 @@ public void testConcurrentDVsForSameDataFile() {
.hasMessageContaining("Found concurrently added DV for %s",
dataFile.location());
}
+ @TestTemplate
+ public void testDuplicateDVsAreMerged() throws IOException {
+ assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
+
+ DataFile dataFile = newDataFile("data_bucket=0");
+ commit(table, table.newRowDelta().addRows(dataFile), branch);
+
+ OutputFileFactory fileFactory =
+ OutputFileFactory.builderFor(table, 1,
1).format(FileFormat.PUFFIN).build();
+
+ DeleteFile deleteFile1 = dvWithPositions(dataFile, fileFactory, 0, 2);
+ DeleteFile deleteFile2 = dvWithPositions(dataFile, fileFactory, 2, 4);
+ DeleteFile deleteFile3 = dvWithPositions(dataFile, fileFactory, 4, 8);
+ RowDelta rowDelta1 =
+
table.newRowDelta().addDeletes(deleteFile1).addDeletes(deleteFile2).addDeletes(deleteFile3);
+
+ commit(table, rowDelta1, branch);
+
+ Iterable<DeleteFile> addedDeleteFiles =
+ latestSnapshot(table, branch).addedDeleteFiles(table.io());
+ assertThat(Iterables.size(addedDeleteFiles)).isEqualTo(1);
+ DeleteFile mergedDV = Iterables.getOnlyElement(addedDeleteFiles);
+
+ assertDVHasDeletedPositions(mergedDV, LongStream.range(0,
8).boxed()::iterator);
+ }
+
+ @TestTemplate
+ public void testDuplicateDVsMergedMultipleSpecs() throws IOException {
+ assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
+
+ // append a partitioned data file
+ DataFile firstSnapshotDataFile = newDataFile("data_bucket=0");
+ commit(table, table.newAppend().appendFile(firstSnapshotDataFile), branch);
+
+ // remove the only partition field to make the spec unpartitioned
+ table.updateSpec().removeField(Expressions.bucket("data", 16)).commit();
+
+ // append an unpartitioned data file
+ DataFile secondSnapshotDataFile = newDataFile("");
+ commit(table, table.newAppend().appendFile(secondSnapshotDataFile),
branch);
+
+ // evolve the spec and add a new partition field
+ table.updateSpec().addField("data").commit();
+
+ // append a data file with the new spec
+ DataFile thirdSnapshotDataFile = newDataFile("data=abc");
+ commit(table, table.newAppend().appendFile(thirdSnapshotDataFile), branch);
+
+ assertThat(table.specs()).hasSize(3);
+
+ OutputFileFactory fileFactory =
+ OutputFileFactory.builderFor(table, 1,
1).format(FileFormat.PUFFIN).build();
+
+ DataFile dataFile = newDataFile("data=xyz");
+ // For each data file, create two DVs covering positions [0,2) and [2,4)
+ DeleteFile deleteFile1a = dvWithPositions(firstSnapshotDataFile,
fileFactory, 0, 2);
+ DeleteFile deleteFile1b = dvWithPositions(firstSnapshotDataFile,
fileFactory, 2, 4);
+ DeleteFile deleteFile2a = dvWithPositions(secondSnapshotDataFile,
fileFactory, 0, 2);
+ DeleteFile deleteFile2b = dvWithPositions(secondSnapshotDataFile,
fileFactory, 2, 4);
+ DeleteFile deleteFile3a = dvWithPositions(thirdSnapshotDataFile,
fileFactory, 0, 2);
+ DeleteFile deleteFile3b = dvWithPositions(thirdSnapshotDataFile,
fileFactory, 2, 4);
+
+ commit(
+ table,
+ table
+ .newRowDelta()
+ .addRows(dataFile)
+ .addDeletes(deleteFile1a)
+ .addDeletes(deleteFile1b)
+ .addDeletes(deleteFile2a)
+ .addDeletes(deleteFile2b)
+ .addDeletes(deleteFile3a)
+ .addDeletes(deleteFile3b),
+ branch);
+
+ Snapshot snapshot = latestSnapshot(table, branch);
+ // Expect 3 merged DVs, one per data file
+ Iterable<DeleteFile> addedDeleteFiles =
snapshot.addedDeleteFiles(table.io());
+ List<DeleteFile> mergedDVs = Lists.newArrayList(addedDeleteFiles);
+ assertThat(mergedDVs).hasSize(3);
+ // Should be a Puffin produced per merged DV spec
+
assertThat(mergedDVs.stream().map(ContentFile::location).collect(Collectors.toSet()))
+ .hasSize(1);
+
+ DeleteFile committedDVForDataFile1 =
+ Iterables.getOnlyElement(
+ mergedDVs.stream()
+ .filter(
+ dv -> Objects.equals(dv.referencedDataFile(),
firstSnapshotDataFile.location()))
+ .collect(Collectors.toList()));
+ assertDVHasDeletedPositions(committedDVForDataFile1, LongStream.range(0,
4).boxed()::iterator);
+
+ DeleteFile committedDVForDataFile2 =
+ Iterables.getOnlyElement(
+ mergedDVs.stream()
+ .filter(
+ dv ->
+ Objects.equals(dv.referencedDataFile(),
secondSnapshotDataFile.location()))
+ .collect(Collectors.toList()));
+ assertDVHasDeletedPositions(committedDVForDataFile2, LongStream.range(0,
4).boxed()::iterator);
+
+ DeleteFile committedDVForDataFile3 =
+ Iterables.getOnlyElement(
+ mergedDVs.stream()
+ .filter(
+ dv -> Objects.equals(dv.referencedDataFile(),
thirdSnapshotDataFile.location()))
+ .collect(Collectors.toList()));
+ assertDVHasDeletedPositions(committedDVForDataFile3, LongStream.range(0,
4).boxed()::iterator);
+ }
+
+ @TestTemplate
+ public void testDuplicateDVsAreMergedForMultipleReferenceFiles() throws
IOException {
+ assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
+
+ DataFile dataFile1 = newDataFile("data_bucket=0");
+ DataFile dataFile2 = newDataFile("data_bucket=0");
+ commit(table, table.newRowDelta().addRows(dataFile1).addRows(dataFile2),
branch);
+
+ OutputFileFactory fileFactory =
+ OutputFileFactory.builderFor(table, 1,
1).format(FileFormat.PUFFIN).build();
+
+ // For each data file, create two DVs covering positions [0,2) and [2,4)
+ DeleteFile deleteFile1a = dvWithPositions(dataFile1, fileFactory, 0, 2);
+ DeleteFile deleteFile1b = dvWithPositions(dataFile1, fileFactory, 2, 4);
+ DeleteFile deleteFile2a = dvWithPositions(dataFile2, fileFactory, 0, 2);
+ DeleteFile deleteFile2b = dvWithPositions(dataFile2, fileFactory, 2, 4);
+
+ // Commit all four duplicate DVs
+ RowDelta rowDelta =
+ table
+ .newRowDelta()
+ .addDeletes(deleteFile1a)
+ .addDeletes(deleteFile1b)
+ .addDeletes(deleteFile2a)
+ .addDeletes(deleteFile2b);
+
+ commit(table, rowDelta, branch);
+
+ // Expect two merged DVs, one per data file
+ Iterable<DeleteFile> addedDeleteFiles =
+ latestSnapshot(table, branch).addedDeleteFiles(table.io());
+ List<DeleteFile> mergedDVs = Lists.newArrayList(addedDeleteFiles);
+
+ assertThat(mergedDVs).hasSize(2);
+ // Should be a single Puffin produced
+
assertThat(mergedDVs.stream().map(ContentFile::location).collect(Collectors.toSet()))
+ .hasSize(1);
+
+ DeleteFile committedDVForDataFile1 =
+ Iterables.getOnlyElement(
+ mergedDVs.stream()
+ .filter(dv -> Objects.equals(dv.referencedDataFile(),
dataFile1.location()))
+ .collect(Collectors.toList()));
+ assertDVHasDeletedPositions(committedDVForDataFile1, LongStream.range(0,
4).boxed()::iterator);
+
+ DeleteFile committedDVForDataFile2 =
+ Iterables.getOnlyElement(
+ mergedDVs.stream()
+ .filter(dv -> Objects.equals(dv.referencedDataFile(),
dataFile2.location()))
+ .collect(Collectors.toList()));
+ assertDVHasDeletedPositions(committedDVForDataFile2, LongStream.range(0,
4).boxed()::iterator);
+ }
+
+ @TestTemplate
+ public void testDuplicateDVsAndValidDV() throws IOException {
+ assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
+
+ DataFile dataFile1 = newDataFile("data_bucket=0");
+ DataFile dataFile2 = newDataFile("data_bucket=0");
+ commit(table, table.newRowDelta().addRows(dataFile1).addRows(dataFile2),
branch);
+
+ OutputFileFactory fileFactory =
+ OutputFileFactory.builderFor(table, 1,
1).format(FileFormat.PUFFIN).build();
+
+ // dataFile1 has duplicate DVs that need merging
+ DeleteFile deleteFile1a = dvWithPositions(dataFile1, fileFactory, 0, 2);
+ DeleteFile deleteFile1b = dvWithPositions(dataFile1, fileFactory, 2, 4);
+
+ // dataFile2 has a valid DV
+ DeleteFile deleteFile2 = dvWithPositions(dataFile2, fileFactory, 0, 3);
+
+ RowDelta rowDelta =
+ table
+ .newRowDelta()
+ .addDeletes(deleteFile1a)
+ .addDeletes(deleteFile1b)
+ .addDeletes(deleteFile2);
+
+ commit(table, rowDelta, branch);
+
+ // Expect two DVs: one merged for dataFile1 and deleteFile2
+ Iterable<DeleteFile> addedDeleteFiles =
+ latestSnapshot(table, branch).addedDeleteFiles(table.io());
+ List<DeleteFile> committedDVs = Lists.newArrayList(addedDeleteFiles);
+
+ assertThat(committedDVs).hasSize(2);
+
+ // Verify merged DV for dataFile1 has positions [0,4)
+ DeleteFile committedDVForDataFile1 =
+ Iterables.getOnlyElement(
+ committedDVs.stream()
+ .filter(dv -> Objects.equals(dv.referencedDataFile(),
dataFile1.location()))
+ .collect(Collectors.toList()));
+ assertDVHasDeletedPositions(committedDVForDataFile1, LongStream.range(0,
4).boxed()::iterator);
+
+ // Verify deleteFile2 state
+ DeleteFile committedDVForDataFile2 =
+ Iterables.getOnlyElement(
+ committedDVs.stream()
+ .filter(dv -> Objects.equals(dv.referencedDataFile(),
dataFile2.location()))
+ .collect(Collectors.toList()));
+ assertDVHasDeletedPositions(committedDVForDataFile2, LongStream.range(0,
3).boxed()::iterator);
+ }
+
+ @TestTemplate
+ public void testDuplicateDVsAreMergedAndEqDelete() throws IOException {
+ assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
+
+ DataFile dataFile = newDataFile("data_bucket=0");
+ commit(table, table.newRowDelta().addRows(dataFile), branch);
+
+ OutputFileFactory fileFactory =
+ OutputFileFactory.builderFor(table, 1,
1).format(FileFormat.PUFFIN).build();
+
+ // Two DVs for the same data file: [0,2) and [2,4) => 4 deleted positions
total
+ DeleteFile dv1 = dvWithPositions(dataFile, fileFactory, 0, 2);
+ DeleteFile dv2 = dvWithPositions(dataFile, fileFactory, 2, 4);
+
+ // One equality delete file for the same partition
+ DeleteFile eqDelete =
+ newEqualityDeleteFile(
+ table.spec().specId(),
+ "data_bucket=0",
+ table.schema().asStruct().fields().get(0).fieldId());
+
+ RowDelta rowDelta =
table.newRowDelta().addDeletes(eqDelete).addDeletes(dv1).addDeletes(dv2);
+
+ commit(table, rowDelta, branch);
+
+ Iterable<DeleteFile> addedDeleteFiles =
+ latestSnapshot(table, branch).addedDeleteFiles(table.io());
+ List<DeleteFile> committedDeletes = Lists.newArrayList(addedDeleteFiles);
+
+ // 1 DV + 1 equality delete
+ assertThat(committedDeletes).hasSize(2);
+
+ DeleteFile committedDV =
+ Iterables.getOnlyElement(
+
committedDeletes.stream().filter(ContentFileUtil::isDV).collect(Collectors.toList()));
+ assertDVHasDeletedPositions(committedDV, LongStream.range(0,
4).boxed()::iterator);
+
+ DeleteFile committedEqDelete =
+ Iterables.getOnlyElement(
+ committedDeletes.stream()
+ .filter(df -> df.content() == FileContent.EQUALITY_DELETES)
+ .collect(Collectors.toList()));
+ assertThat(committedEqDelete).isNotNull();
+
assertThat(committedEqDelete.content()).isEqualTo(FileContent.EQUALITY_DELETES);
+ }
Review Comment:
Good call, I added tests that exercise these validations (those 3 are
accurate)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]