stevenzwu commented on code in PR #4683:
URL: https://github.com/apache/iceberg/pull/4683#discussion_r867300251
##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java:
##########
@@ -262,4 +271,191 @@ public void testPosDeletesAllRowsInBatch() throws
IOException {
Assert.assertEquals("Table should contain expected rows", expected,
actual);
}
+
+ @Test
+ public void testPosDeletesWithDeletedColumn() throws IOException {
+ // read.parquet.vectorization.batch-size is set to 4, so the 4 rows in the
first batch are all deleted.
+ List<Pair<CharSequence, Long>> deletes = Lists.newArrayList(
+ Pair.of(dataFile.path(), 0L), // id = 29
+ Pair.of(dataFile.path(), 1L), // id = 43
+ Pair.of(dataFile.path(), 2L), // id = 61
+ Pair.of(dataFile.path(), 3L) // id = 89
+ );
+
+ Pair<DeleteFile, CharSequenceSet> posDeletes = FileHelpers.writeDeleteFile(
+ table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0),
deletes);
+
+ table.newRowDelta()
+ .addDeletes(posDeletes.first())
+ .validateDataFilesExist(posDeletes.second())
+ .commit();
+
+ StructLikeSet expected = expectedRowSet(29, 43, 61, 89);
+ StructLikeSet actual = rowSet(tableName, table, "id", "data", "_deleted");
+
+ validate(expected, actual);
+ }
+
+ @Test
+ public void testEqualityDeleteWithDeletedColumn() throws IOException {
+ String tableName = table.name().substring(table.name().lastIndexOf(".") +
1);
+ Schema deleteRowSchema = table.schema().select("data");
+ Record dataDelete = GenericRecord.create(deleteRowSchema);
+ List<Record> dataDeletes = Lists.newArrayList(
+ dataDelete.copy("data", "a"), // id = 29
+ dataDelete.copy("data", "d"), // id = 89
+ dataDelete.copy("data", "g") // id = 122
+ );
+
+ DeleteFile eqDeletes = FileHelpers.writeDeleteFile(
+ table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0),
dataDeletes, deleteRowSchema);
+
+ table.newRowDelta()
+ .addDeletes(eqDeletes)
+ .commit();
+
+ StructLikeSet actual = rowSet(tableName, table, "id", "data", "_deleted");
+ validate(expectedRowSet(29, 89, 122), actual);
+ }
+
+ @Test
+ public void testMixedPosAndEqDeletesWithDeletedColumn() throws IOException {
+ Schema dataSchema = table.schema().select("data");
+ Record dataDelete = GenericRecord.create(dataSchema);
+ List<Record> dataDeletes = Lists.newArrayList(
+ dataDelete.copy("data", "a"), // id = 29
+ dataDelete.copy("data", "d"), // id = 89
+ dataDelete.copy("data", "g") // id = 122
+ );
+
+ DeleteFile eqDeletes = FileHelpers.writeDeleteFile(
+ table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0),
dataDeletes, dataSchema);
+
+ List<Pair<CharSequence, Long>> deletes = Lists.newArrayList(
+ Pair.of(dataFile.path(), 3L), // id = 89
+ Pair.of(dataFile.path(), 5L) // id = 121
+ );
+
+ Pair<DeleteFile, CharSequenceSet> posDeletes = FileHelpers.writeDeleteFile(
+ table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0),
deletes);
+
+ table.newRowDelta()
+ .addDeletes(eqDeletes)
+ .addDeletes(posDeletes.first())
+ .validateDataFilesExist(posDeletes.second())
+ .commit();
+
+ StructLikeSet expected = expectedRowSet(29, 89, 121, 122);
+ StructLikeSet actual = rowSet(tableName, table, "id", "data", "_deleted");
+
+ validate(expected, actual);
+ }
+
+ @Test
+ public void testFilterOnDeletedMetadataColumn() throws IOException {
+ List<Pair<CharSequence, Long>> deletes = Lists.newArrayList(
+ Pair.of(dataFile.path(), 0L), // id = 29
+ Pair.of(dataFile.path(), 1L), // id = 43
+ Pair.of(dataFile.path(), 2L), // id = 61
+ Pair.of(dataFile.path(), 3L) // id = 89
+ );
+
+ Pair<DeleteFile, CharSequenceSet> posDeletes = FileHelpers.writeDeleteFile(
+ table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0),
deletes);
+
+ table.newRowDelta()
+ .addDeletes(posDeletes.first())
+ .validateDataFilesExist(posDeletes.second())
+ .commit();
+
+ // get undeleted rows
+ StructLikeSet expected = expectedRowSet(true, false, 29, 43, 61, 89);
+
+ Dataset<Row> df = spark.read()
+ .format("iceberg")
+ .load(TableIdentifier.of("default", tableName).toString())
+ .select("id", "data", "_deleted")
+ .filter("_deleted = false");
+
+ Types.StructType projection = table.schema().select("id", "data",
"_deleted").asStruct();
+ StructLikeSet actual = StructLikeSet.create(projection);
+ df.collectAsList().forEach(row -> {
+ SparkStructLike rowWrapper = new SparkStructLike(projection);
+ actual.add(rowWrapper.wrap(row));
+ });
+
+ validate(expected, actual);
+
+ // get deleted rows
+ expected = expectedRowSet(false, true, 29, 43, 61, 89);
+
+ df = spark.read()
+ .format("iceberg")
+ .load(TableIdentifier.of("default", tableName).toString())
+ .select("id", "data", "_deleted")
+ .filter("_deleted = true");
+
+ StructLikeSet actualDeleted = StructLikeSet.create(projection);
+ df.collectAsList().forEach(row -> {
+ SparkStructLike rowWrapper = new SparkStructLike(projection);
+ actualDeleted.add(rowWrapper.wrap(row));
+ });
+
+ validate(expected, actualDeleted);
+ }
+
+ private static final Schema PROJECTION_SCHEMA = new Schema(
+ required(100, "id", Types.LongType.get()),
+ required(101, "data", Types.StringType.get()),
+ MetadataColumns.IS_DELETED
+ );
+
+ private void validate(StructLikeSet expected, StructLikeSet actual) {
+ Assert.assertEquals("Table should contain the same number of rows",
expected.size(), actual.size());
+ for (StructLike expectedRow : expected) {
+ Assert.assertTrue("Table should contain expected row",
actual.contains(expectedRow));
+ }
+ }
+
+ protected static StructLikeSet expectedRowSet(int... idsToRemove) {
+ return expectedRowSet(false, false, idsToRemove);
+ }
+
+ protected static StructLikeSet expectedRowSet(boolean removeDeleted, boolean
removeUndeleted, int... idsToRemove) {
+ List<Record> records = getRecordsWithDeletedColumn();
+
+ Set<Integer> deletedIds =
Sets.newHashSet(ArrayUtil.toIntList(idsToRemove));
+ StructLikeSet set = StructLikeSet.create(PROJECTION_SCHEMA.asStruct());
+ for (Record record : records) {
+ if (deletedIds.contains(record.getField("id"))) {
+ if (removeDeleted) {
+ continue;
+ } else {
+ record.setField(MetadataColumns.IS_DELETED.name(), true);
+ }
+ } else if (removeUndeleted) {
+ continue;
+ }
+
+ StructLike structLike = new
InternalRecordWrapper(PROJECTION_SCHEMA.asStruct()).wrap(record);
+ set.add(structLike);
+ }
+ return set;
+ }
+
+ @NotNull
+ private static List getRecordsWithDeletedColumn() {
Review Comment:
nit: Iceberg coding style doesn't use `get`. also return type probably can
be a generic type
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]