szehon-ho commented on code in PR #4683:
URL: https://github.com/apache/iceberg/pull/4683#discussion_r880824910
##########
core/src/main/java/org/apache/iceberg/deletes/Deletes.java:
##########
@@ -137,93 +146,81 @@ protected boolean shouldKeep(T row) {
}
}
- private static class PositionSetDeleteFilter<T> extends Filter<T> {
- private final Function<T, Long> rowToPosition;
- private final PositionDeleteIndex deleteSet;
-
- private PositionSetDeleteFilter(Function<T, Long> rowToPosition,
PositionDeleteIndex deleteSet) {
- this.rowToPosition = rowToPosition;
- this.deleteSet = deleteSet;
- }
-
- @Override
- protected boolean shouldKeep(T row) {
- return !deleteSet.isDeleted(rowToPosition.apply(row));
- }
- }
-
private static class PositionStreamDeleteFilter<T> extends CloseableGroup
implements CloseableIterable<T> {
private final CloseableIterable<T> rows;
+ private final CloseableIterator<Long> deletePosIterator;
private final Function<T, Long> extractPos;
- private final CloseableIterable<Long> deletePositions;
+ private long nextDeletePos;
private PositionStreamDeleteFilter(CloseableIterable<T> rows, Function<T,
Long> extractPos,
Review Comment:
Unrelated, but should we keep consistent: extractPos => rowToPosition
##########
core/src/main/java/org/apache/iceberg/deletes/Deletes.java:
##########
@@ -137,93 +146,81 @@ protected boolean shouldKeep(T row) {
}
}
- private static class PositionSetDeleteFilter<T> extends Filter<T> {
- private final Function<T, Long> rowToPosition;
- private final PositionDeleteIndex deleteSet;
-
- private PositionSetDeleteFilter(Function<T, Long> rowToPosition,
PositionDeleteIndex deleteSet) {
- this.rowToPosition = rowToPosition;
- this.deleteSet = deleteSet;
- }
-
- @Override
- protected boolean shouldKeep(T row) {
- return !deleteSet.isDeleted(rowToPosition.apply(row));
- }
- }
-
private static class PositionStreamDeleteFilter<T> extends CloseableGroup
implements CloseableIterable<T> {
private final CloseableIterable<T> rows;
+ private final CloseableIterator<Long> deletePosIterator;
private final Function<T, Long> extractPos;
- private final CloseableIterable<Long> deletePositions;
+ private long nextDeletePos;
private PositionStreamDeleteFilter(CloseableIterable<T> rows, Function<T,
Long> extractPos,
CloseableIterable<Long>
deletePositions) {
this.rows = rows;
this.extractPos = extractPos;
- this.deletePositions = deletePositions;
+ this.deletePosIterator = deletePositions.iterator();
}
@Override
public CloseableIterator<T> iterator() {
- CloseableIterator<Long> deletePosIterator = deletePositions.iterator();
-
CloseableIterator<T> iter;
if (deletePosIterator.hasNext()) {
- iter = new PositionFilterIterator(rows.iterator(), deletePosIterator);
+ nextDeletePos = deletePosIterator.next();
+ iter = createPosDeleteIterator(rows.iterator());
} else {
iter = rows.iterator();
- try {
- deletePosIterator.close();
- } catch (IOException e) {
- throw new UncheckedIOException("Failed to close delete positions
iterator", e);
- }
}
addCloseable(iter);
+ addCloseable(deletePosIterator);
return iter;
}
- private class PositionFilterIterator extends FilterIterator<T> {
- private final CloseableIterator<Long> deletePosIterator;
- private long nextDeletePos;
+ boolean isDeleted(T row) {
Review Comment:
Nit: I think we can follow
https://iceberg.apache.org/contribute/#method-naming to get rid of 'is', as it
doesn't add any value?
##########
core/src/test/java/org/apache/iceberg/deletes/TestPositionFilter.java:
##########
@@ -117,6 +118,30 @@ public void testPositionStreamRowFilter() {
Lists.newArrayList(Iterables.transform(actual, row -> row.get(0,
Long.class))));
}
+ @Test
+ public void testPositionStreamRowDeleteMarker() {
+ CloseableIterable<StructLike> rows =
CloseableIterable.withNoopClose(Lists.newArrayList(
+ Row.of(0L, "a", false),
+ Row.of(1L, "b", false),
+ Row.of(2L, "c", false),
+ Row.of(3L, "d", false),
+ Row.of(4L, "e", false),
+ Row.of(5L, "f", false),
+ Row.of(6L, "g", false),
+ Row.of(7L, "h", false),
+ Row.of(8L, "i", false),
+ Row.of(9L, "j", false)
+ ));
+
+ CloseableIterable<Long> deletes =
CloseableIterable.withNoopClose(Lists.newArrayList(0L, 3L, 4L, 7L, 9L));
+
+ CloseableIterable<StructLike> actual = Deletes.streamingMarker(rows, row
-> row.get(0, Long.class), deletes,
Review Comment:
Nit: can we use inline comments to make it easier to read:
```
Deletes.streamingMarker(rows,
row -> row.get(0, Long.class). /* row to position */
deletes,
row -> row.set(2, true) /* delete marker */
##########
data/src/main/java/org/apache/iceberg/data/DeleteFilter.java:
##########
@@ -226,10 +226,16 @@ private CloseableIterable<T>
applyPosDeletes(CloseableIterable<T> records) {
// if there are fewer deletes than a reasonable number to keep in memory,
use a set
if (posDeletes.stream().mapToLong(DeleteFile::recordCount).sum() <
setFilterThreshold) {
- return Deletes.filter(records, this::pos,
Deletes.toPositionIndex(filePath, deletes));
+ PositionDeleteIndex positionIndex = Deletes.toPositionIndex(filePath,
deletes);
+ Predicate<T> isInDeleteSet = record ->
positionIndex.isDeleted(pos(record));
+ return hasColumnIsDeleted ?
+ Deletes.markDeleted(records, isInDeleteSet, this::markRowDeleted) :
Review Comment:
Just personal preference, but maybe we can push the pos function call down
to Deletes (as we do it for streaming, but not the non-streaming)
```
PositionDeleteIndex positionDeleteIndex = Deletes.deletePositions(filePath,
deletes);
if (posDeletes.stream().mapToLong(DeleteFile::recordCount).sum() <
setFilterThreshold) {
return hasColumnIsDeleted ?
Deletes.markDeleted(records, this::pos, positionDeleteIndex,
this::markDeleted) :
Deletes.filterDeleted(records, this::pos, positionDeleteIndex);
}
where Deletes.markDeleted is just a wrapper that makes the Predicate.
```
##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java:
##########
@@ -262,4 +273,199 @@ public void testPosDeletesAllRowsInBatch() throws
IOException {
Assert.assertEquals("Table should contain expected rows", expected,
actual);
}
+
+ @Test
+ public void testPosDeletesWithDeletedColumn() throws IOException {
+ if (vectorized) {
+ return;
+ }
+
+ // read.parquet.vectorization.batch-size is set to 4, so the 4 rows in the
first batch are all deleted.
+ List<Pair<CharSequence, Long>> deletes = Lists.newArrayList(
+ Pair.of(dataFile.path(), 0L), // id = 29
+ Pair.of(dataFile.path(), 1L), // id = 43
+ Pair.of(dataFile.path(), 2L), // id = 61
+ Pair.of(dataFile.path(), 3L) // id = 89
+ );
+
+ Pair<DeleteFile, CharSequenceSet> posDeletes = FileHelpers.writeDeleteFile(
+ table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0),
deletes);
+
+ table.newRowDelta()
+ .addDeletes(posDeletes.first())
+ .validateDataFilesExist(posDeletes.second())
+ .commit();
+
+ StructLikeSet expected = expectedRowSet(29, 43, 61, 89);
+ StructLikeSet actual = rowSet(tableName, PROJECTION_SCHEMA.asStruct(),
"id", "data", "_deleted");
+
+ Assert.assertTrue("Table should contain expected row",
actual.equals(expected));
+ }
+
+ @Test
+ public void testEqualityDeleteWithDeletedColumn() throws IOException {
+ if (vectorized) {
+ return;
+ }
+
+ String tableName = table.name().substring(table.name().lastIndexOf(".") +
1);
+ Schema deleteRowSchema = table.schema().select("data");
+ Record dataDelete = GenericRecord.create(deleteRowSchema);
+ List<Record> dataDeletes = Lists.newArrayList(
+ dataDelete.copy("data", "a"), // id = 29
+ dataDelete.copy("data", "d"), // id = 89
+ dataDelete.copy("data", "g") // id = 122
+ );
+
+ DeleteFile eqDeletes = FileHelpers.writeDeleteFile(
+ table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0),
dataDeletes, deleteRowSchema);
+
+ table.newRowDelta()
+ .addDeletes(eqDeletes)
+ .commit();
+
+ StructLikeSet expected = expectedRowSet(29, 89, 122);
+ StructLikeSet actual = rowSet(tableName, PROJECTION_SCHEMA.asStruct(),
"id", "data", "_deleted");
+
+ Assert.assertTrue("Table should contain expected row",
actual.equals(expected));
+ }
+
+ @Test
+ public void testMixedPosAndEqDeletesWithDeletedColumn() throws IOException {
+ if (vectorized) {
+ return;
+ }
+
+ Schema dataSchema = table.schema().select("data");
+ Record dataDelete = GenericRecord.create(dataSchema);
+ List<Record> dataDeletes = Lists.newArrayList(
+ dataDelete.copy("data", "a"), // id = 29
+ dataDelete.copy("data", "d"), // id = 89
+ dataDelete.copy("data", "g") // id = 122
+ );
+
+ DeleteFile eqDeletes = FileHelpers.writeDeleteFile(
+ table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0),
dataDeletes, dataSchema);
+
+ List<Pair<CharSequence, Long>> deletes = Lists.newArrayList(
+ Pair.of(dataFile.path(), 3L), // id = 89
+ Pair.of(dataFile.path(), 5L) // id = 121
+ );
+
+ Pair<DeleteFile, CharSequenceSet> posDeletes = FileHelpers.writeDeleteFile(
+ table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0),
deletes);
+
+ table.newRowDelta()
+ .addDeletes(eqDeletes)
+ .addDeletes(posDeletes.first())
+ .validateDataFilesExist(posDeletes.second())
+ .commit();
+
+ StructLikeSet expected = expectedRowSet(29, 89, 121, 122);
+ StructLikeSet actual = rowSet(tableName, PROJECTION_SCHEMA.asStruct(),
"id", "data", "_deleted");
+
+ Assert.assertTrue("Table should contain expected row",
actual.equals(expected));
Review Comment:
Same
##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java:
##########
@@ -262,4 +273,199 @@ public void testPosDeletesAllRowsInBatch() throws
IOException {
Assert.assertEquals("Table should contain expected rows", expected,
actual);
}
+
+ @Test
+ public void testPosDeletesWithDeletedColumn() throws IOException {
+ if (vectorized) {
+ return;
+ }
+
+ // read.parquet.vectorization.batch-size is set to 4, so the 4 rows in the
first batch are all deleted.
+ List<Pair<CharSequence, Long>> deletes = Lists.newArrayList(
+ Pair.of(dataFile.path(), 0L), // id = 29
+ Pair.of(dataFile.path(), 1L), // id = 43
+ Pair.of(dataFile.path(), 2L), // id = 61
+ Pair.of(dataFile.path(), 3L) // id = 89
+ );
+
+ Pair<DeleteFile, CharSequenceSet> posDeletes = FileHelpers.writeDeleteFile(
+ table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0),
deletes);
+
+ table.newRowDelta()
+ .addDeletes(posDeletes.first())
+ .validateDataFilesExist(posDeletes.second())
+ .commit();
+
+ StructLikeSet expected = expectedRowSet(29, 43, 61, 89);
+ StructLikeSet actual = rowSet(tableName, PROJECTION_SCHEMA.asStruct(),
"id", "data", "_deleted");
+
+ Assert.assertTrue("Table should contain expected row",
actual.equals(expected));
+ }
+
+ @Test
+ public void testEqualityDeleteWithDeletedColumn() throws IOException {
+ if (vectorized) {
+ return;
+ }
+
+ String tableName = table.name().substring(table.name().lastIndexOf(".") +
1);
+ Schema deleteRowSchema = table.schema().select("data");
+ Record dataDelete = GenericRecord.create(deleteRowSchema);
+ List<Record> dataDeletes = Lists.newArrayList(
+ dataDelete.copy("data", "a"), // id = 29
+ dataDelete.copy("data", "d"), // id = 89
+ dataDelete.copy("data", "g") // id = 122
+ );
+
+ DeleteFile eqDeletes = FileHelpers.writeDeleteFile(
+ table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0),
dataDeletes, deleteRowSchema);
+
+ table.newRowDelta()
+ .addDeletes(eqDeletes)
+ .commit();
+
+ StructLikeSet expected = expectedRowSet(29, 89, 122);
+ StructLikeSet actual = rowSet(tableName, PROJECTION_SCHEMA.asStruct(),
"id", "data", "_deleted");
+
+ Assert.assertTrue("Table should contain expected row",
actual.equals(expected));
+ }
+
+ @Test
+ public void testMixedPosAndEqDeletesWithDeletedColumn() throws IOException {
+ if (vectorized) {
Review Comment:
How about just Assume? So it makes it properly skipped on JUnit then.
##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java:
##########
@@ -262,4 +273,199 @@ public void testPosDeletesAllRowsInBatch() throws
IOException {
Assert.assertEquals("Table should contain expected rows", expected,
actual);
}
+
+ @Test
+ public void testPosDeletesWithDeletedColumn() throws IOException {
+ if (vectorized) {
+ return;
+ }
+
+ // read.parquet.vectorization.batch-size is set to 4, so the 4 rows in the
first batch are all deleted.
+ List<Pair<CharSequence, Long>> deletes = Lists.newArrayList(
+ Pair.of(dataFile.path(), 0L), // id = 29
+ Pair.of(dataFile.path(), 1L), // id = 43
+ Pair.of(dataFile.path(), 2L), // id = 61
+ Pair.of(dataFile.path(), 3L) // id = 89
+ );
+
+ Pair<DeleteFile, CharSequenceSet> posDeletes = FileHelpers.writeDeleteFile(
+ table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0),
deletes);
+
+ table.newRowDelta()
+ .addDeletes(posDeletes.first())
+ .validateDataFilesExist(posDeletes.second())
+ .commit();
+
+ StructLikeSet expected = expectedRowSet(29, 43, 61, 89);
+ StructLikeSet actual = rowSet(tableName, PROJECTION_SCHEMA.asStruct(),
"id", "data", "_deleted");
+
+ Assert.assertTrue("Table should contain expected row",
actual.equals(expected));
+ }
+
+ @Test
+ public void testEqualityDeleteWithDeletedColumn() throws IOException {
+ if (vectorized) {
+ return;
+ }
+
+ String tableName = table.name().substring(table.name().lastIndexOf(".") +
1);
+ Schema deleteRowSchema = table.schema().select("data");
+ Record dataDelete = GenericRecord.create(deleteRowSchema);
+ List<Record> dataDeletes = Lists.newArrayList(
+ dataDelete.copy("data", "a"), // id = 29
+ dataDelete.copy("data", "d"), // id = 89
+ dataDelete.copy("data", "g") // id = 122
+ );
+
+ DeleteFile eqDeletes = FileHelpers.writeDeleteFile(
+ table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0),
dataDeletes, deleteRowSchema);
+
+ table.newRowDelta()
+ .addDeletes(eqDeletes)
+ .commit();
+
+ StructLikeSet expected = expectedRowSet(29, 89, 122);
+ StructLikeSet actual = rowSet(tableName, PROJECTION_SCHEMA.asStruct(),
"id", "data", "_deleted");
+
+ Assert.assertTrue("Table should contain expected row",
actual.equals(expected));
+ }
+
+ @Test
+ public void testMixedPosAndEqDeletesWithDeletedColumn() throws IOException {
+ if (vectorized) {
+ return;
+ }
+
+ Schema dataSchema = table.schema().select("data");
+ Record dataDelete = GenericRecord.create(dataSchema);
+ List<Record> dataDeletes = Lists.newArrayList(
+ dataDelete.copy("data", "a"), // id = 29
+ dataDelete.copy("data", "d"), // id = 89
+ dataDelete.copy("data", "g") // id = 122
+ );
+
+ DeleteFile eqDeletes = FileHelpers.writeDeleteFile(
+ table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0),
dataDeletes, dataSchema);
+
+ List<Pair<CharSequence, Long>> deletes = Lists.newArrayList(
+ Pair.of(dataFile.path(), 3L), // id = 89
+ Pair.of(dataFile.path(), 5L) // id = 121
+ );
+
+ Pair<DeleteFile, CharSequenceSet> posDeletes = FileHelpers.writeDeleteFile(
+ table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0),
deletes);
+
+ table.newRowDelta()
+ .addDeletes(eqDeletes)
+ .addDeletes(posDeletes.first())
+ .validateDataFilesExist(posDeletes.second())
+ .commit();
+
+ StructLikeSet expected = expectedRowSet(29, 89, 121, 122);
+ StructLikeSet actual = rowSet(tableName, PROJECTION_SCHEMA.asStruct(),
"id", "data", "_deleted");
+
+ Assert.assertTrue("Table should contain expected row",
actual.equals(expected));
+ }
+
+ @Test
+ public void testFilterOnDeletedMetadataColumn() throws IOException {
+ if (vectorized) {
+ return;
+ }
+
+ List<Pair<CharSequence, Long>> deletes = Lists.newArrayList(
+ Pair.of(dataFile.path(), 0L), // id = 29
+ Pair.of(dataFile.path(), 1L), // id = 43
+ Pair.of(dataFile.path(), 2L), // id = 61
+ Pair.of(dataFile.path(), 3L) // id = 89
+ );
+
+ Pair<DeleteFile, CharSequenceSet> posDeletes = FileHelpers.writeDeleteFile(
+ table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0),
deletes);
+
+ table.newRowDelta()
+ .addDeletes(posDeletes.first())
+ .validateDataFilesExist(posDeletes.second())
+ .commit();
+
+ // get undeleted rows
+ StructLikeSet expected = expectedRowSet(true, false, 29, 43, 61, 89);
+
+ Dataset<Row> df = spark.read()
+ .format("iceberg")
+ .load(TableIdentifier.of("default", tableName).toString())
+ .select("id", "data", "_deleted")
+ .filter("_deleted = false");
+
+ Types.StructType projection = PROJECTION_SCHEMA.asStruct();
+ StructLikeSet actual = StructLikeSet.create(projection);
+ df.collectAsList().forEach(row -> {
+ SparkStructLike rowWrapper = new SparkStructLike(projection);
+ actual.add(rowWrapper.wrap(row));
+ });
+
+ Assert.assertTrue("Table should contain expected row",
actual.equals(expected));
Review Comment:
assertEquals
##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java:
##########
@@ -262,4 +273,199 @@ public void testPosDeletesAllRowsInBatch() throws
IOException {
Assert.assertEquals("Table should contain expected rows", expected,
actual);
}
+
+ @Test
+ public void testPosDeletesWithDeletedColumn() throws IOException {
+ if (vectorized) {
+ return;
+ }
+
+ // read.parquet.vectorization.batch-size is set to 4, so the 4 rows in the
first batch are all deleted.
+ List<Pair<CharSequence, Long>> deletes = Lists.newArrayList(
+ Pair.of(dataFile.path(), 0L), // id = 29
+ Pair.of(dataFile.path(), 1L), // id = 43
+ Pair.of(dataFile.path(), 2L), // id = 61
+ Pair.of(dataFile.path(), 3L) // id = 89
+ );
+
+ Pair<DeleteFile, CharSequenceSet> posDeletes = FileHelpers.writeDeleteFile(
+ table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0),
deletes);
+
+ table.newRowDelta()
+ .addDeletes(posDeletes.first())
+ .validateDataFilesExist(posDeletes.second())
+ .commit();
+
+ StructLikeSet expected = expectedRowSet(29, 43, 61, 89);
+ StructLikeSet actual = rowSet(tableName, PROJECTION_SCHEMA.asStruct(),
"id", "data", "_deleted");
+
+ Assert.assertTrue("Table should contain expected row",
actual.equals(expected));
+ }
+
+ @Test
+ public void testEqualityDeleteWithDeletedColumn() throws IOException {
+ if (vectorized) {
+ return;
+ }
+
+ String tableName = table.name().substring(table.name().lastIndexOf(".") +
1);
+ Schema deleteRowSchema = table.schema().select("data");
+ Record dataDelete = GenericRecord.create(deleteRowSchema);
+ List<Record> dataDeletes = Lists.newArrayList(
+ dataDelete.copy("data", "a"), // id = 29
+ dataDelete.copy("data", "d"), // id = 89
+ dataDelete.copy("data", "g") // id = 122
+ );
+
+ DeleteFile eqDeletes = FileHelpers.writeDeleteFile(
+ table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0),
dataDeletes, deleteRowSchema);
+
+ table.newRowDelta()
+ .addDeletes(eqDeletes)
+ .commit();
+
+ StructLikeSet expected = expectedRowSet(29, 89, 122);
+ StructLikeSet actual = rowSet(tableName, PROJECTION_SCHEMA.asStruct(),
"id", "data", "_deleted");
+
+ Assert.assertTrue("Table should contain expected row",
actual.equals(expected));
+ }
+
+ @Test
+ public void testMixedPosAndEqDeletesWithDeletedColumn() throws IOException {
+ if (vectorized) {
+ return;
+ }
+
+ Schema dataSchema = table.schema().select("data");
+ Record dataDelete = GenericRecord.create(dataSchema);
+ List<Record> dataDeletes = Lists.newArrayList(
+ dataDelete.copy("data", "a"), // id = 29
+ dataDelete.copy("data", "d"), // id = 89
+ dataDelete.copy("data", "g") // id = 122
+ );
+
+ DeleteFile eqDeletes = FileHelpers.writeDeleteFile(
+ table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0),
dataDeletes, dataSchema);
+
+ List<Pair<CharSequence, Long>> deletes = Lists.newArrayList(
+ Pair.of(dataFile.path(), 3L), // id = 89
+ Pair.of(dataFile.path(), 5L) // id = 121
+ );
+
+ Pair<DeleteFile, CharSequenceSet> posDeletes = FileHelpers.writeDeleteFile(
+ table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0),
deletes);
+
+ table.newRowDelta()
+ .addDeletes(eqDeletes)
+ .addDeletes(posDeletes.first())
+ .validateDataFilesExist(posDeletes.second())
+ .commit();
+
+ StructLikeSet expected = expectedRowSet(29, 89, 121, 122);
+ StructLikeSet actual = rowSet(tableName, PROJECTION_SCHEMA.asStruct(),
"id", "data", "_deleted");
+
+ Assert.assertTrue("Table should contain expected row",
actual.equals(expected));
+ }
+
+ @Test
+ public void testFilterOnDeletedMetadataColumn() throws IOException {
+ if (vectorized) {
+ return;
+ }
+
+ List<Pair<CharSequence, Long>> deletes = Lists.newArrayList(
+ Pair.of(dataFile.path(), 0L), // id = 29
+ Pair.of(dataFile.path(), 1L), // id = 43
+ Pair.of(dataFile.path(), 2L), // id = 61
+ Pair.of(dataFile.path(), 3L) // id = 89
+ );
+
+ Pair<DeleteFile, CharSequenceSet> posDeletes = FileHelpers.writeDeleteFile(
+ table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0),
deletes);
+
+ table.newRowDelta()
+ .addDeletes(posDeletes.first())
+ .validateDataFilesExist(posDeletes.second())
+ .commit();
+
+ // get undeleted rows
Review Comment:
Nit: comment may make more sense if directly above the spark.read() line
instead of line to make expectedRowSet. Or can remove it , I think.
##########
data/src/main/java/org/apache/iceberg/data/DeleteFilter.java:
##########
@@ -169,30 +176,23 @@ public CloseableIterable<T>
findEqualityDeleteRows(CloseableIterable<T> records)
.reduce(Predicate::or)
.orElse(t -> false);
- Filter<T> deletedRowsFilter = new Filter<T>() {
- @Override
- protected boolean shouldKeep(T item) {
- return deletedRows.test(item);
- }
- };
- return deletedRowsFilter.filter(records);
+ return CloseableIterable.filter(records, deletedRows);
}
private CloseableIterable<T> applyEqDeletes(CloseableIterable<T> records) {
- // Predicate to test whether a row should be visible to user after
applying equality deletions.
- Predicate<T> remainingRows = applyEqDeletes().stream()
- .map(Predicate::negate)
- .reduce(Predicate::and)
- .orElse(t -> true);
-
- Filter<T> remainingRowsFilter = new Filter<T>() {
- @Override
- protected boolean shouldKeep(T item) {
- return remainingRows.test(item);
- }
- };
+ Predicate<T> isEqDeleted = applyEqDeletes().stream()
Review Comment:
Nit: Same variable suggestion, 'deleted' or 'eqDeleted'
##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java:
##########
@@ -262,4 +273,199 @@ public void testPosDeletesAllRowsInBatch() throws
IOException {
Assert.assertEquals("Table should contain expected rows", expected,
actual);
}
+
+ @Test
+ public void testPosDeletesWithDeletedColumn() throws IOException {
+ if (vectorized) {
+ return;
+ }
+
+ // read.parquet.vectorization.batch-size is set to 4, so the 4 rows in the
first batch are all deleted.
+ List<Pair<CharSequence, Long>> deletes = Lists.newArrayList(
+ Pair.of(dataFile.path(), 0L), // id = 29
+ Pair.of(dataFile.path(), 1L), // id = 43
+ Pair.of(dataFile.path(), 2L), // id = 61
+ Pair.of(dataFile.path(), 3L) // id = 89
+ );
+
+ Pair<DeleteFile, CharSequenceSet> posDeletes = FileHelpers.writeDeleteFile(
+ table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0),
deletes);
+
+ table.newRowDelta()
+ .addDeletes(posDeletes.first())
+ .validateDataFilesExist(posDeletes.second())
+ .commit();
+
+ StructLikeSet expected = expectedRowSet(29, 43, 61, 89);
+ StructLikeSet actual = rowSet(tableName, PROJECTION_SCHEMA.asStruct(),
"id", "data", "_deleted");
+
+ Assert.assertTrue("Table should contain expected row",
actual.equals(expected));
+ }
+
+ @Test
+ public void testEqualityDeleteWithDeletedColumn() throws IOException {
+ if (vectorized) {
+ return;
+ }
+
+ String tableName = table.name().substring(table.name().lastIndexOf(".") +
1);
+ Schema deleteRowSchema = table.schema().select("data");
+ Record dataDelete = GenericRecord.create(deleteRowSchema);
+ List<Record> dataDeletes = Lists.newArrayList(
+ dataDelete.copy("data", "a"), // id = 29
+ dataDelete.copy("data", "d"), // id = 89
+ dataDelete.copy("data", "g") // id = 122
+ );
+
+ DeleteFile eqDeletes = FileHelpers.writeDeleteFile(
+ table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0),
dataDeletes, deleteRowSchema);
+
+ table.newRowDelta()
+ .addDeletes(eqDeletes)
+ .commit();
+
+ StructLikeSet expected = expectedRowSet(29, 89, 122);
+ StructLikeSet actual = rowSet(tableName, PROJECTION_SCHEMA.asStruct(),
"id", "data", "_deleted");
+
+ Assert.assertTrue("Table should contain expected row",
actual.equals(expected));
+ }
+
+ @Test
+ public void testMixedPosAndEqDeletesWithDeletedColumn() throws IOException {
+ if (vectorized) {
+ return;
+ }
+
+ Schema dataSchema = table.schema().select("data");
+ Record dataDelete = GenericRecord.create(dataSchema);
+ List<Record> dataDeletes = Lists.newArrayList(
+ dataDelete.copy("data", "a"), // id = 29
+ dataDelete.copy("data", "d"), // id = 89
+ dataDelete.copy("data", "g") // id = 122
+ );
+
+ DeleteFile eqDeletes = FileHelpers.writeDeleteFile(
+ table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0),
dataDeletes, dataSchema);
+
+ List<Pair<CharSequence, Long>> deletes = Lists.newArrayList(
+ Pair.of(dataFile.path(), 3L), // id = 89
+ Pair.of(dataFile.path(), 5L) // id = 121
+ );
+
+ Pair<DeleteFile, CharSequenceSet> posDeletes = FileHelpers.writeDeleteFile(
+ table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0),
deletes);
+
+ table.newRowDelta()
+ .addDeletes(eqDeletes)
+ .addDeletes(posDeletes.first())
+ .validateDataFilesExist(posDeletes.second())
+ .commit();
+
+ StructLikeSet expected = expectedRowSet(29, 89, 121, 122);
+ StructLikeSet actual = rowSet(tableName, PROJECTION_SCHEMA.asStruct(),
"id", "data", "_deleted");
+
+ Assert.assertTrue("Table should contain expected row",
actual.equals(expected));
+ }
+
+ @Test
+ public void testFilterOnDeletedMetadataColumn() throws IOException {
+ if (vectorized) {
+ return;
+ }
+
+ List<Pair<CharSequence, Long>> deletes = Lists.newArrayList(
+ Pair.of(dataFile.path(), 0L), // id = 29
+ Pair.of(dataFile.path(), 1L), // id = 43
+ Pair.of(dataFile.path(), 2L), // id = 61
+ Pair.of(dataFile.path(), 3L) // id = 89
+ );
+
+ Pair<DeleteFile, CharSequenceSet> posDeletes = FileHelpers.writeDeleteFile(
+ table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0),
deletes);
+
+ table.newRowDelta()
+ .addDeletes(posDeletes.first())
+ .validateDataFilesExist(posDeletes.second())
+ .commit();
+
+ // get undeleted rows
+ StructLikeSet expected = expectedRowSet(true, false, 29, 43, 61, 89);
+
+ Dataset<Row> df = spark.read()
+ .format("iceberg")
+ .load(TableIdentifier.of("default", tableName).toString())
+ .select("id", "data", "_deleted")
+ .filter("_deleted = false");
+
+ Types.StructType projection = PROJECTION_SCHEMA.asStruct();
+ StructLikeSet actual = StructLikeSet.create(projection);
+ df.collectAsList().forEach(row -> {
+ SparkStructLike rowWrapper = new SparkStructLike(projection);
+ actual.add(rowWrapper.wrap(row));
+ });
+
+ Assert.assertTrue("Table should contain expected row",
actual.equals(expected));
+
+ // get deleted rows
+ StructLikeSet expectedDeleted = expectedRowSet(false, true, 29, 43, 61,
89);
+
+ df = spark.read()
+ .format("iceberg")
+ .load(TableIdentifier.of("default", tableName).toString())
+ .select("id", "data", "_deleted")
+ .filter("_deleted = true");
+
+ StructLikeSet actualDeleted = StructLikeSet.create(projection);
+ df.collectAsList().forEach(row -> {
+ SparkStructLike rowWrapper = new SparkStructLike(projection);
+ actualDeleted.add(rowWrapper.wrap(row));
+ });
+
+ Assert.assertTrue("Table should contain expected row",
actualDeleted.equals(expectedDeleted));
+ }
+
+ private static final Schema PROJECTION_SCHEMA = new Schema(
+ required(1, "id", Types.IntegerType.get()),
+ required(2, "data", Types.StringType.get()),
+ MetadataColumns.IS_DELETED
+ );
+
+ private static StructLikeSet expectedRowSet(int... idsToRemove) {
+ return expectedRowSet(false, false, idsToRemove);
+ }
+
+ private static StructLikeSet expectedRowSet(boolean removeDeleted, boolean
removeUndeleted, int... idsToRemove) {
Review Comment:
Should it be nonDeleted instead of unDeleted? (unDeleted sounds like it was
deleted and then put back)
##########
core/src/main/java/org/apache/iceberg/deletes/Deletes.java:
##########
@@ -137,93 +146,81 @@ protected boolean shouldKeep(T row) {
}
}
- private static class PositionSetDeleteFilter<T> extends Filter<T> {
- private final Function<T, Long> rowToPosition;
- private final PositionDeleteIndex deleteSet;
-
- private PositionSetDeleteFilter(Function<T, Long> rowToPosition,
PositionDeleteIndex deleteSet) {
- this.rowToPosition = rowToPosition;
- this.deleteSet = deleteSet;
- }
-
- @Override
- protected boolean shouldKeep(T row) {
- return !deleteSet.isDeleted(rowToPosition.apply(row));
- }
- }
-
private static class PositionStreamDeleteFilter<T> extends CloseableGroup
implements CloseableIterable<T> {
private final CloseableIterable<T> rows;
+ private final CloseableIterator<Long> deletePosIterator;
private final Function<T, Long> extractPos;
- private final CloseableIterable<Long> deletePositions;
+ private long nextDeletePos;
private PositionStreamDeleteFilter(CloseableIterable<T> rows, Function<T,
Long> extractPos,
CloseableIterable<Long>
deletePositions) {
this.rows = rows;
this.extractPos = extractPos;
- this.deletePositions = deletePositions;
+ this.deletePosIterator = deletePositions.iterator();
}
@Override
public CloseableIterator<T> iterator() {
- CloseableIterator<Long> deletePosIterator = deletePositions.iterator();
-
CloseableIterator<T> iter;
if (deletePosIterator.hasNext()) {
- iter = new PositionFilterIterator(rows.iterator(), deletePosIterator);
+ nextDeletePos = deletePosIterator.next();
+ iter = createPosDeleteIterator(rows.iterator());
} else {
iter = rows.iterator();
- try {
- deletePosIterator.close();
- } catch (IOException e) {
- throw new UncheckedIOException("Failed to close delete positions
iterator", e);
- }
}
addCloseable(iter);
+ addCloseable(deletePosIterator);
return iter;
}
- private class PositionFilterIterator extends FilterIterator<T> {
- private final CloseableIterator<Long> deletePosIterator;
- private long nextDeletePos;
+ boolean isDeleted(T row) {
+ long currentPos = extractPos.apply(row);
+ if (currentPos < nextDeletePos) {
+ return false;
+ }
- protected PositionFilterIterator(CloseableIterator<T> items,
CloseableIterator<Long> deletePositions) {
- super(items);
- this.deletePosIterator = deletePositions;
+ // consume delete positions until the next is past the current position
+ boolean isDeleted = currentPos == nextDeletePos;
Review Comment:
Same (can remove 'is')
##########
data/src/main/java/org/apache/iceberg/data/DeleteFilter.java:
##########
@@ -226,10 +226,16 @@ private CloseableIterable<T>
applyPosDeletes(CloseableIterable<T> records) {
// if there are fewer deletes than a reasonable number to keep in memory,
use a set
if (posDeletes.stream().mapToLong(DeleteFile::recordCount).sum() <
setFilterThreshold) {
- return Deletes.filter(records, this::pos,
Deletes.toPositionIndex(filePath, deletes));
+ PositionDeleteIndex positionIndex = Deletes.toPositionIndex(filePath,
deletes);
Review Comment:
Extract this to a separate variable on top, and use it in outside the else
block?
##########
data/src/main/java/org/apache/iceberg/data/DeleteFilter.java:
##########
@@ -226,10 +226,16 @@ private CloseableIterable<T>
applyPosDeletes(CloseableIterable<T> records) {
// if there are fewer deletes than a reasonable number to keep in memory,
use a set
if (posDeletes.stream().mapToLong(DeleteFile::recordCount).sum() <
setFilterThreshold) {
- return Deletes.filter(records, this::pos,
Deletes.toPositionIndex(filePath, deletes));
+ PositionDeleteIndex positionIndex = Deletes.toPositionIndex(filePath,
deletes);
+ Predicate<T> isInDeleteSet = record ->
positionIndex.isDeleted(pos(record));
Review Comment:
Nit: same, 'inDeleteSet' or just 'deleted'
##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java:
##########
@@ -262,4 +273,199 @@ public void testPosDeletesAllRowsInBatch() throws
IOException {
Assert.assertEquals("Table should contain expected rows", expected,
actual);
}
+
+ @Test
+ public void testPosDeletesWithDeletedColumn() throws IOException {
+ if (vectorized) {
+ return;
+ }
+
+ // read.parquet.vectorization.batch-size is set to 4, so the 4 rows in the
first batch are all deleted.
+ List<Pair<CharSequence, Long>> deletes = Lists.newArrayList(
+ Pair.of(dataFile.path(), 0L), // id = 29
+ Pair.of(dataFile.path(), 1L), // id = 43
+ Pair.of(dataFile.path(), 2L), // id = 61
+ Pair.of(dataFile.path(), 3L) // id = 89
+ );
+
+ Pair<DeleteFile, CharSequenceSet> posDeletes = FileHelpers.writeDeleteFile(
+ table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0),
deletes);
+
+ table.newRowDelta()
+ .addDeletes(posDeletes.first())
+ .validateDataFilesExist(posDeletes.second())
+ .commit();
+
+ StructLikeSet expected = expectedRowSet(29, 43, 61, 89);
+ StructLikeSet actual = rowSet(tableName, PROJECTION_SCHEMA.asStruct(),
"id", "data", "_deleted");
+
+ Assert.assertTrue("Table should contain expected row",
actual.equals(expected));
+ }
+
+ @Test
+ public void testEqualityDeleteWithDeletedColumn() throws IOException {
+ if (vectorized) {
+ return;
+ }
+
+ String tableName = table.name().substring(table.name().lastIndexOf(".") +
1);
+ Schema deleteRowSchema = table.schema().select("data");
+ Record dataDelete = GenericRecord.create(deleteRowSchema);
+ List<Record> dataDeletes = Lists.newArrayList(
+ dataDelete.copy("data", "a"), // id = 29
+ dataDelete.copy("data", "d"), // id = 89
+ dataDelete.copy("data", "g") // id = 122
+ );
+
+ DeleteFile eqDeletes = FileHelpers.writeDeleteFile(
+ table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0),
dataDeletes, deleteRowSchema);
+
+ table.newRowDelta()
+ .addDeletes(eqDeletes)
+ .commit();
+
+ StructLikeSet expected = expectedRowSet(29, 89, 122);
+ StructLikeSet actual = rowSet(tableName, PROJECTION_SCHEMA.asStruct(),
"id", "data", "_deleted");
+
+ Assert.assertTrue("Table should contain expected row",
actual.equals(expected));
Review Comment:
Same, assertEquals
##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java:
##########
@@ -262,4 +273,199 @@ public void testPosDeletesAllRowsInBatch() throws
IOException {
Assert.assertEquals("Table should contain expected rows", expected,
actual);
}
+
+ @Test
+ public void testPosDeletesWithDeletedColumn() throws IOException {
+ if (vectorized) {
+ return;
+ }
+
+ // read.parquet.vectorization.batch-size is set to 4, so the 4 rows in the
first batch are all deleted.
+ List<Pair<CharSequence, Long>> deletes = Lists.newArrayList(
+ Pair.of(dataFile.path(), 0L), // id = 29
+ Pair.of(dataFile.path(), 1L), // id = 43
+ Pair.of(dataFile.path(), 2L), // id = 61
+ Pair.of(dataFile.path(), 3L) // id = 89
+ );
+
+ Pair<DeleteFile, CharSequenceSet> posDeletes = FileHelpers.writeDeleteFile(
+ table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0),
deletes);
+
+ table.newRowDelta()
+ .addDeletes(posDeletes.first())
+ .validateDataFilesExist(posDeletes.second())
+ .commit();
+
+ StructLikeSet expected = expectedRowSet(29, 43, 61, 89);
+ StructLikeSet actual = rowSet(tableName, PROJECTION_SCHEMA.asStruct(),
"id", "data", "_deleted");
+
+ Assert.assertTrue("Table should contain expected row",
actual.equals(expected));
+ }
+
+ @Test
+ public void testEqualityDeleteWithDeletedColumn() throws IOException {
+ if (vectorized) {
+ return;
+ }
+
+ String tableName = table.name().substring(table.name().lastIndexOf(".") +
1);
+ Schema deleteRowSchema = table.schema().select("data");
+ Record dataDelete = GenericRecord.create(deleteRowSchema);
+ List<Record> dataDeletes = Lists.newArrayList(
+ dataDelete.copy("data", "a"), // id = 29
+ dataDelete.copy("data", "d"), // id = 89
+ dataDelete.copy("data", "g") // id = 122
+ );
+
+ DeleteFile eqDeletes = FileHelpers.writeDeleteFile(
+ table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0),
dataDeletes, deleteRowSchema);
+
+ table.newRowDelta()
+ .addDeletes(eqDeletes)
+ .commit();
+
+ StructLikeSet expected = expectedRowSet(29, 89, 122);
+ StructLikeSet actual = rowSet(tableName, PROJECTION_SCHEMA.asStruct(),
"id", "data", "_deleted");
+
+ Assert.assertTrue("Table should contain expected row",
actual.equals(expected));
+ }
+
+ @Test
+ public void testMixedPosAndEqDeletesWithDeletedColumn() throws IOException {
+ if (vectorized) {
+ return;
+ }
+
+ Schema dataSchema = table.schema().select("data");
+ Record dataDelete = GenericRecord.create(dataSchema);
+ List<Record> dataDeletes = Lists.newArrayList(
+ dataDelete.copy("data", "a"), // id = 29
+ dataDelete.copy("data", "d"), // id = 89
+ dataDelete.copy("data", "g") // id = 122
+ );
+
+ DeleteFile eqDeletes = FileHelpers.writeDeleteFile(
+ table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0),
dataDeletes, dataSchema);
+
+ List<Pair<CharSequence, Long>> deletes = Lists.newArrayList(
+ Pair.of(dataFile.path(), 3L), // id = 89
+ Pair.of(dataFile.path(), 5L) // id = 121
+ );
+
+ Pair<DeleteFile, CharSequenceSet> posDeletes = FileHelpers.writeDeleteFile(
+ table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0),
deletes);
+
+ table.newRowDelta()
+ .addDeletes(eqDeletes)
+ .addDeletes(posDeletes.first())
+ .validateDataFilesExist(posDeletes.second())
+ .commit();
+
+ StructLikeSet expected = expectedRowSet(29, 89, 121, 122);
+ StructLikeSet actual = rowSet(tableName, PROJECTION_SCHEMA.asStruct(),
"id", "data", "_deleted");
+
+ Assert.assertTrue("Table should contain expected row",
actual.equals(expected));
+ }
+
+ @Test
+ public void testFilterOnDeletedMetadataColumn() throws IOException {
+ if (vectorized) {
+ return;
+ }
+
+ List<Pair<CharSequence, Long>> deletes = Lists.newArrayList(
+ Pair.of(dataFile.path(), 0L), // id = 29
+ Pair.of(dataFile.path(), 1L), // id = 43
+ Pair.of(dataFile.path(), 2L), // id = 61
+ Pair.of(dataFile.path(), 3L) // id = 89
+ );
+
+ Pair<DeleteFile, CharSequenceSet> posDeletes = FileHelpers.writeDeleteFile(
+ table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0),
deletes);
+
+ table.newRowDelta()
+ .addDeletes(posDeletes.first())
+ .validateDataFilesExist(posDeletes.second())
+ .commit();
+
+ // get undeleted rows
+ StructLikeSet expected = expectedRowSet(true, false, 29, 43, 61, 89);
+
+ Dataset<Row> df = spark.read()
+ .format("iceberg")
+ .load(TableIdentifier.of("default", tableName).toString())
+ .select("id", "data", "_deleted")
+ .filter("_deleted = false");
+
+ Types.StructType projection = PROJECTION_SCHEMA.asStruct();
+ StructLikeSet actual = StructLikeSet.create(projection);
+ df.collectAsList().forEach(row -> {
+ SparkStructLike rowWrapper = new SparkStructLike(projection);
+ actual.add(rowWrapper.wrap(row));
+ });
+
+ Assert.assertTrue("Table should contain expected row",
actual.equals(expected));
+
+ // get deleted rows
+ StructLikeSet expectedDeleted = expectedRowSet(false, true, 29, 43, 61,
89);
+
+ df = spark.read()
+ .format("iceberg")
+ .load(TableIdentifier.of("default", tableName).toString())
+ .select("id", "data", "_deleted")
+ .filter("_deleted = true");
+
+ StructLikeSet actualDeleted = StructLikeSet.create(projection);
+ df.collectAsList().forEach(row -> {
+ SparkStructLike rowWrapper = new SparkStructLike(projection);
+ actualDeleted.add(rowWrapper.wrap(row));
+ });
+
+ Assert.assertTrue("Table should contain expected row",
actualDeleted.equals(expectedDeleted));
Review Comment:
assertEquals
##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java:
##########
@@ -262,4 +273,199 @@ public void testPosDeletesAllRowsInBatch() throws
IOException {
Assert.assertEquals("Table should contain expected rows", expected,
actual);
}
+
+ @Test
+ public void testPosDeletesWithDeletedColumn() throws IOException {
+ if (vectorized) {
Review Comment:
Assume
##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java:
##########
@@ -262,4 +273,199 @@ public void testPosDeletesAllRowsInBatch() throws
IOException {
Assert.assertEquals("Table should contain expected rows", expected,
actual);
}
+
+ @Test
+ public void testPosDeletesWithDeletedColumn() throws IOException {
+ if (vectorized) {
+ return;
+ }
+
+ // read.parquet.vectorization.batch-size is set to 4, so the 4 rows in the
first batch are all deleted.
+ List<Pair<CharSequence, Long>> deletes = Lists.newArrayList(
+ Pair.of(dataFile.path(), 0L), // id = 29
+ Pair.of(dataFile.path(), 1L), // id = 43
+ Pair.of(dataFile.path(), 2L), // id = 61
+ Pair.of(dataFile.path(), 3L) // id = 89
+ );
+
+ Pair<DeleteFile, CharSequenceSet> posDeletes = FileHelpers.writeDeleteFile(
+ table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0),
deletes);
+
+ table.newRowDelta()
+ .addDeletes(posDeletes.first())
+ .validateDataFilesExist(posDeletes.second())
+ .commit();
+
+ StructLikeSet expected = expectedRowSet(29, 43, 61, 89);
+ StructLikeSet actual = rowSet(tableName, PROJECTION_SCHEMA.asStruct(),
"id", "data", "_deleted");
+
+ Assert.assertTrue("Table should contain expected row",
actual.equals(expected));
+ }
+
+ @Test
+ public void testEqualityDeleteWithDeletedColumn() throws IOException {
+ if (vectorized) {
+ return;
+ }
+
+ String tableName = table.name().substring(table.name().lastIndexOf(".") +
1);
+ Schema deleteRowSchema = table.schema().select("data");
+ Record dataDelete = GenericRecord.create(deleteRowSchema);
+ List<Record> dataDeletes = Lists.newArrayList(
+ dataDelete.copy("data", "a"), // id = 29
+ dataDelete.copy("data", "d"), // id = 89
+ dataDelete.copy("data", "g") // id = 122
+ );
+
+ DeleteFile eqDeletes = FileHelpers.writeDeleteFile(
+ table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0),
dataDeletes, deleteRowSchema);
+
+ table.newRowDelta()
+ .addDeletes(eqDeletes)
+ .commit();
+
+ StructLikeSet expected = expectedRowSet(29, 89, 122);
+ StructLikeSet actual = rowSet(tableName, PROJECTION_SCHEMA.asStruct(),
"id", "data", "_deleted");
+
+ Assert.assertTrue("Table should contain expected row",
actual.equals(expected));
+ }
+
+ @Test
+ public void testMixedPosAndEqDeletesWithDeletedColumn() throws IOException {
+ if (vectorized) {
+ return;
+ }
+
+ Schema dataSchema = table.schema().select("data");
+ Record dataDelete = GenericRecord.create(dataSchema);
+ List<Record> dataDeletes = Lists.newArrayList(
+ dataDelete.copy("data", "a"), // id = 29
+ dataDelete.copy("data", "d"), // id = 89
+ dataDelete.copy("data", "g") // id = 122
+ );
+
+ DeleteFile eqDeletes = FileHelpers.writeDeleteFile(
+ table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0),
dataDeletes, dataSchema);
+
+ List<Pair<CharSequence, Long>> deletes = Lists.newArrayList(
+ Pair.of(dataFile.path(), 3L), // id = 89
+ Pair.of(dataFile.path(), 5L) // id = 121
+ );
+
+ Pair<DeleteFile, CharSequenceSet> posDeletes = FileHelpers.writeDeleteFile(
+ table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0),
deletes);
+
+ table.newRowDelta()
+ .addDeletes(eqDeletes)
+ .addDeletes(posDeletes.first())
+ .validateDataFilesExist(posDeletes.second())
+ .commit();
+
+ StructLikeSet expected = expectedRowSet(29, 89, 121, 122);
+ StructLikeSet actual = rowSet(tableName, PROJECTION_SCHEMA.asStruct(),
"id", "data", "_deleted");
+
+ Assert.assertTrue("Table should contain expected row",
actual.equals(expected));
+ }
+
+ @Test
+ public void testFilterOnDeletedMetadataColumn() throws IOException {
+ if (vectorized) {
+ return;
+ }
+
+ List<Pair<CharSequence, Long>> deletes = Lists.newArrayList(
+ Pair.of(dataFile.path(), 0L), // id = 29
+ Pair.of(dataFile.path(), 1L), // id = 43
+ Pair.of(dataFile.path(), 2L), // id = 61
+ Pair.of(dataFile.path(), 3L) // id = 89
+ );
+
+ Pair<DeleteFile, CharSequenceSet> posDeletes = FileHelpers.writeDeleteFile(
+ table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0),
deletes);
+
+ table.newRowDelta()
+ .addDeletes(posDeletes.first())
+ .validateDataFilesExist(posDeletes.second())
+ .commit();
+
+ // get undeleted rows
+ StructLikeSet expected = expectedRowSet(true, false, 29, 43, 61, 89);
+
+ Dataset<Row> df = spark.read()
+ .format("iceberg")
+ .load(TableIdentifier.of("default", tableName).toString())
+ .select("id", "data", "_deleted")
+ .filter("_deleted = false");
+
+ Types.StructType projection = PROJECTION_SCHEMA.asStruct();
+ StructLikeSet actual = StructLikeSet.create(projection);
+ df.collectAsList().forEach(row -> {
+ SparkStructLike rowWrapper = new SparkStructLike(projection);
+ actual.add(rowWrapper.wrap(row));
+ });
+
+ Assert.assertTrue("Table should contain expected row",
actual.equals(expected));
+
+ // get deleted rows
+ StructLikeSet expectedDeleted = expectedRowSet(false, true, 29, 43, 61,
89);
Review Comment:
Nit: can put inline comment on true/false, like:
https://iceberg.apache.org/contribute/#boolean-arguments
##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java:
##########
@@ -262,4 +273,199 @@ public void testPosDeletesAllRowsInBatch() throws
IOException {
Assert.assertEquals("Table should contain expected rows", expected,
actual);
}
+
+ @Test
+ public void testPosDeletesWithDeletedColumn() throws IOException {
+ if (vectorized) {
+ return;
+ }
+
+ // read.parquet.vectorization.batch-size is set to 4, so the 4 rows in the
first batch are all deleted.
+ List<Pair<CharSequence, Long>> deletes = Lists.newArrayList(
+ Pair.of(dataFile.path(), 0L), // id = 29
+ Pair.of(dataFile.path(), 1L), // id = 43
+ Pair.of(dataFile.path(), 2L), // id = 61
+ Pair.of(dataFile.path(), 3L) // id = 89
+ );
+
+ Pair<DeleteFile, CharSequenceSet> posDeletes = FileHelpers.writeDeleteFile(
+ table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0),
deletes);
+
+ table.newRowDelta()
+ .addDeletes(posDeletes.first())
+ .validateDataFilesExist(posDeletes.second())
+ .commit();
+
+ StructLikeSet expected = expectedRowSet(29, 43, 61, 89);
+ StructLikeSet actual = rowSet(tableName, PROJECTION_SCHEMA.asStruct(),
"id", "data", "_deleted");
+
+ Assert.assertTrue("Table should contain expected row",
actual.equals(expected));
+ }
+
+ @Test
+ public void testEqualityDeleteWithDeletedColumn() throws IOException {
+ if (vectorized) {
Review Comment:
Same, Assume
##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java:
##########
@@ -262,4 +273,199 @@ public void testPosDeletesAllRowsInBatch() throws
IOException {
Assert.assertEquals("Table should contain expected rows", expected,
actual);
}
+
+ @Test
+ public void testPosDeletesWithDeletedColumn() throws IOException {
+ if (vectorized) {
+ return;
+ }
+
+ // read.parquet.vectorization.batch-size is set to 4, so the 4 rows in the
first batch are all deleted.
+ List<Pair<CharSequence, Long>> deletes = Lists.newArrayList(
+ Pair.of(dataFile.path(), 0L), // id = 29
+ Pair.of(dataFile.path(), 1L), // id = 43
+ Pair.of(dataFile.path(), 2L), // id = 61
+ Pair.of(dataFile.path(), 3L) // id = 89
+ );
+
+ Pair<DeleteFile, CharSequenceSet> posDeletes = FileHelpers.writeDeleteFile(
+ table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0),
deletes);
+
+ table.newRowDelta()
+ .addDeletes(posDeletes.first())
+ .validateDataFilesExist(posDeletes.second())
+ .commit();
+
+ StructLikeSet expected = expectedRowSet(29, 43, 61, 89);
+ StructLikeSet actual = rowSet(tableName, PROJECTION_SCHEMA.asStruct(),
"id", "data", "_deleted");
+
+ Assert.assertTrue("Table should contain expected row",
actual.equals(expected));
Review Comment:
Why not just assertEquals?
##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java:
##########
@@ -262,4 +273,199 @@ public void testPosDeletesAllRowsInBatch() throws
IOException {
Assert.assertEquals("Table should contain expected rows", expected,
actual);
}
+
+ @Test
+ public void testPosDeletesWithDeletedColumn() throws IOException {
+ if (vectorized) {
+ return;
+ }
+
+ // read.parquet.vectorization.batch-size is set to 4, so the 4 rows in the
first batch are all deleted.
+ List<Pair<CharSequence, Long>> deletes = Lists.newArrayList(
+ Pair.of(dataFile.path(), 0L), // id = 29
+ Pair.of(dataFile.path(), 1L), // id = 43
+ Pair.of(dataFile.path(), 2L), // id = 61
+ Pair.of(dataFile.path(), 3L) // id = 89
+ );
+
+ Pair<DeleteFile, CharSequenceSet> posDeletes = FileHelpers.writeDeleteFile(
+ table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0),
deletes);
+
+ table.newRowDelta()
+ .addDeletes(posDeletes.first())
+ .validateDataFilesExist(posDeletes.second())
+ .commit();
+
+ StructLikeSet expected = expectedRowSet(29, 43, 61, 89);
+ StructLikeSet actual = rowSet(tableName, PROJECTION_SCHEMA.asStruct(),
"id", "data", "_deleted");
+
+ Assert.assertTrue("Table should contain expected row",
actual.equals(expected));
+ }
+
+ @Test
+ public void testEqualityDeleteWithDeletedColumn() throws IOException {
+ if (vectorized) {
+ return;
+ }
+
+ String tableName = table.name().substring(table.name().lastIndexOf(".") +
1);
+ Schema deleteRowSchema = table.schema().select("data");
+ Record dataDelete = GenericRecord.create(deleteRowSchema);
+ List<Record> dataDeletes = Lists.newArrayList(
+ dataDelete.copy("data", "a"), // id = 29
+ dataDelete.copy("data", "d"), // id = 89
+ dataDelete.copy("data", "g") // id = 122
+ );
+
+ DeleteFile eqDeletes = FileHelpers.writeDeleteFile(
+ table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0),
dataDeletes, deleteRowSchema);
+
+ table.newRowDelta()
+ .addDeletes(eqDeletes)
+ .commit();
+
+ StructLikeSet expected = expectedRowSet(29, 89, 122);
+ StructLikeSet actual = rowSet(tableName, PROJECTION_SCHEMA.asStruct(),
"id", "data", "_deleted");
+
+ Assert.assertTrue("Table should contain expected row",
actual.equals(expected));
+ }
+
+ @Test
+ public void testMixedPosAndEqDeletesWithDeletedColumn() throws IOException {
+ if (vectorized) {
+ return;
+ }
+
+ Schema dataSchema = table.schema().select("data");
+ Record dataDelete = GenericRecord.create(dataSchema);
+ List<Record> dataDeletes = Lists.newArrayList(
+ dataDelete.copy("data", "a"), // id = 29
+ dataDelete.copy("data", "d"), // id = 89
+ dataDelete.copy("data", "g") // id = 122
+ );
+
+ DeleteFile eqDeletes = FileHelpers.writeDeleteFile(
+ table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0),
dataDeletes, dataSchema);
+
+ List<Pair<CharSequence, Long>> deletes = Lists.newArrayList(
+ Pair.of(dataFile.path(), 3L), // id = 89
+ Pair.of(dataFile.path(), 5L) // id = 121
+ );
+
+ Pair<DeleteFile, CharSequenceSet> posDeletes = FileHelpers.writeDeleteFile(
+ table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0),
deletes);
+
+ table.newRowDelta()
+ .addDeletes(eqDeletes)
+ .addDeletes(posDeletes.first())
+ .validateDataFilesExist(posDeletes.second())
+ .commit();
+
+ StructLikeSet expected = expectedRowSet(29, 89, 121, 122);
+ StructLikeSet actual = rowSet(tableName, PROJECTION_SCHEMA.asStruct(),
"id", "data", "_deleted");
+
+ Assert.assertTrue("Table should contain expected row",
actual.equals(expected));
+ }
+
+ @Test
+ public void testFilterOnDeletedMetadataColumn() throws IOException {
+ if (vectorized) {
Review Comment:
Assume
##########
core/src/main/java/org/apache/iceberg/deletes/Deletes.java:
##########
@@ -227,6 +237,39 @@ public void close() {
}
}
+ private static class PositionStreamDeleteMarker<T> extends
PositionStreamDeleteFilter<T> {
Review Comment:
Nit: Do you think it will be cleaner to have most of the logic at:
```
abstract class PositionStreamDeleteIterable {
CloseableIterator createPosDeleteIterator(CloseableIterator<T> items);
}
```
and have the two concrete subclass (PositionStreamDeleteMarker and
PositionStreamDeleteFilter) extend it?
I think having the Marker extend the Filter still seems a bit strange,
though the logic is correctly refactored now.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]