stevenzwu commented on code in PR #4683:
URL: https://github.com/apache/iceberg/pull/4683#discussion_r868816035


##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java:
##########
@@ -262,4 +271,189 @@ public void testPosDeletesAllRowsInBatch() throws 
IOException {
 
     Assert.assertEquals("Table should contain expected rows", expected, 
actual);
   }
+
+  @Test
+  public void testPosDeletesWithDeletedColumn() throws IOException {
+    // read.parquet.vectorization.batch-size is set to 4, so the 4 rows in the 
first batch are all deleted.
+    List<Pair<CharSequence, Long>> deletes = Lists.newArrayList(
+        Pair.of(dataFile.path(), 0L), // id = 29
+        Pair.of(dataFile.path(), 1L), // id = 43
+        Pair.of(dataFile.path(), 2L), // id = 61
+        Pair.of(dataFile.path(), 3L) // id = 89
+    );
+
+    Pair<DeleteFile, CharSequenceSet> posDeletes = FileHelpers.writeDeleteFile(
+        table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), 
deletes);
+
+    table.newRowDelta()
+        .addDeletes(posDeletes.first())
+        .validateDataFilesExist(posDeletes.second())
+        .commit();
+
+    StructLikeSet expected = expectedRowSet(29, 43, 61, 89);
+    StructLikeSet actual = rowSet(tableName, table, "id", "data", "_deleted");
+
+    validate(expected, actual);
+  }
+
+  @Test
+  public void testEqualityDeleteWithDeletedColumn() throws IOException {
+    String tableName = table.name().substring(table.name().lastIndexOf(".") + 
1);
+    Schema deleteRowSchema = table.schema().select("data");
+    Record dataDelete = GenericRecord.create(deleteRowSchema);
+    List<Record> dataDeletes = Lists.newArrayList(
+        dataDelete.copy("data", "a"), // id = 29
+        dataDelete.copy("data", "d"), // id = 89
+        dataDelete.copy("data", "g") // id = 122
+    );
+
+    DeleteFile eqDeletes = FileHelpers.writeDeleteFile(
+        table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), 
dataDeletes, deleteRowSchema);
+
+    table.newRowDelta()
+        .addDeletes(eqDeletes)
+        .commit();
+
+    StructLikeSet actual = rowSet(tableName, table, "id", "data", "_deleted");
+    validate(expectedRowSet(29, 89, 122), actual);
+  }
+
+  @Test
+  public void testMixedPosAndEqDeletesWithDeletedColumn() throws IOException {
+    Schema dataSchema = table.schema().select("data");
+    Record dataDelete = GenericRecord.create(dataSchema);
+    List<Record> dataDeletes = Lists.newArrayList(
+        dataDelete.copy("data", "a"), // id = 29
+        dataDelete.copy("data", "d"), // id = 89
+        dataDelete.copy("data", "g") // id = 122
+    );
+
+    DeleteFile eqDeletes = FileHelpers.writeDeleteFile(
+        table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), 
dataDeletes, dataSchema);
+
+    List<Pair<CharSequence, Long>> deletes = Lists.newArrayList(
+        Pair.of(dataFile.path(), 3L), // id = 89
+        Pair.of(dataFile.path(), 5L) // id = 121
+    );
+
+    Pair<DeleteFile, CharSequenceSet> posDeletes = FileHelpers.writeDeleteFile(
+        table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), 
deletes);
+
+    table.newRowDelta()
+        .addDeletes(eqDeletes)
+        .addDeletes(posDeletes.first())
+        .validateDataFilesExist(posDeletes.second())
+        .commit();
+
+    StructLikeSet expected = expectedRowSet(29, 89, 121, 122);
+    StructLikeSet actual = rowSet(tableName, table, "id", "data", "_deleted");
+
+    validate(expected, actual);
+  }
+
+  @Test
+  public void testFilterOnDeletedMetadataColumn() throws IOException {
+    List<Pair<CharSequence, Long>> deletes = Lists.newArrayList(
+        Pair.of(dataFile.path(), 0L), // id = 29
+        Pair.of(dataFile.path(), 1L), // id = 43
+        Pair.of(dataFile.path(), 2L), // id = 61
+        Pair.of(dataFile.path(), 3L) // id = 89
+    );
+
+    Pair<DeleteFile, CharSequenceSet> posDeletes = FileHelpers.writeDeleteFile(
+        table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), 
deletes);
+
+    table.newRowDelta()
+        .addDeletes(posDeletes.first())
+        .validateDataFilesExist(posDeletes.second())
+        .commit();
+
+    // get undeleted rows
+    StructLikeSet expected = expectedRowSet(true, false, 29, 43, 61, 89);
+
+    Dataset<Row> df = spark.read()
+        .format("iceberg")
+        .load(TableIdentifier.of("default", tableName).toString())
+        .select("id", "data", "_deleted")
+        .filter("_deleted = false");
+
+    Types.StructType projection = table.schema().select("id", "data", 
"_deleted").asStruct();
+    StructLikeSet actual = StructLikeSet.create(projection);
+    df.collectAsList().forEach(row -> {
+      SparkStructLike rowWrapper = new SparkStructLike(projection);
+      actual.add(rowWrapper.wrap(row));
+    });
+
+    validate(expected, actual);
+
+    // get deleted rows
+    expected = expectedRowSet(false, true, 29, 43, 61, 89);
+
+    df = spark.read()
+        .format("iceberg")
+        .load(TableIdentifier.of("default", tableName).toString())
+        .select("id", "data", "_deleted")
+        .filter("_deleted = true");
+
+    StructLikeSet actualDeleted = StructLikeSet.create(projection);
+    df.collectAsList().forEach(row -> {
+      SparkStructLike rowWrapper = new SparkStructLike(projection);
+      actualDeleted.add(rowWrapper.wrap(row));
+    });
+
+    validate(expected, actualDeleted);
+  }
+
+  private static final Schema PROJECTION_SCHEMA = new Schema(
+      required(1, "id", Types.LongType.get()),
+      required(2, "data", Types.StringType.get()),
+      MetadataColumns.IS_DELETED
+  );
+
+  private void validate(StructLikeSet expected, StructLikeSet actual) {
+    Assert.assertEquals("Table should contain the same number of rows", 
expected.size(), actual.size());
+    for (StructLike expectedRow : expected) {
+      Assert.assertTrue("Table should contain expected row", 
actual.contains(expectedRow));
+    }
+  }
+
+  private static StructLikeSet expectedRowSet(int... idsToRemove) {
+    return expectedRowSet(false, false, idsToRemove);
+  }
+
+  private static StructLikeSet expectedRowSet(boolean removeDeleted, boolean 
removeUndeleted, int... idsToRemove) {
+    List<Record> records = recordsWithDeletedColumn();
+
+    Set<Integer> deletedIds = 
Sets.newHashSet(ArrayUtil.toIntList(idsToRemove));

Review Comment:
   nit: remove the extra empty lines before and after. and maybe move this line 
as the first line



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to