stevenzwu commented on code in PR #4683:
URL: https://github.com/apache/iceberg/pull/4683#discussion_r867301186


##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java:
##########
@@ -262,4 +271,191 @@ public void testPosDeletesAllRowsInBatch() throws 
IOException {
 
     Assert.assertEquals("Table should contain expected rows", expected, 
actual);
   }
+
+  @Test
+  public void testPosDeletesWithDeletedColumn() throws IOException {
+    // read.parquet.vectorization.batch-size is set to 4, so the 4 rows in the 
first batch are all deleted.
+    List<Pair<CharSequence, Long>> deletes = Lists.newArrayList(
+        Pair.of(dataFile.path(), 0L), // id = 29
+        Pair.of(dataFile.path(), 1L), // id = 43
+        Pair.of(dataFile.path(), 2L), // id = 61
+        Pair.of(dataFile.path(), 3L) // id = 89
+    );
+
+    Pair<DeleteFile, CharSequenceSet> posDeletes = FileHelpers.writeDeleteFile(
+        table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), 
deletes);
+
+    table.newRowDelta()
+        .addDeletes(posDeletes.first())
+        .validateDataFilesExist(posDeletes.second())
+        .commit();
+
+    StructLikeSet expected = expectedRowSet(29, 43, 61, 89);
+    StructLikeSet actual = rowSet(tableName, table, "id", "data", "_deleted");
+
+    validate(expected, actual);
+  }
+
+  @Test
+  public void testEqualityDeleteWithDeletedColumn() throws IOException {
+    String tableName = table.name().substring(table.name().lastIndexOf(".") + 
1);
+    Schema deleteRowSchema = table.schema().select("data");
+    Record dataDelete = GenericRecord.create(deleteRowSchema);
+    List<Record> dataDeletes = Lists.newArrayList(
+        dataDelete.copy("data", "a"), // id = 29
+        dataDelete.copy("data", "d"), // id = 89
+        dataDelete.copy("data", "g") // id = 122
+    );
+
+    DeleteFile eqDeletes = FileHelpers.writeDeleteFile(
+        table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), 
dataDeletes, deleteRowSchema);
+
+    table.newRowDelta()
+        .addDeletes(eqDeletes)
+        .commit();
+
+    StructLikeSet actual = rowSet(tableName, table, "id", "data", "_deleted");
+    validate(expectedRowSet(29, 89, 122), actual);
+  }
+
+  @Test
+  public void testMixedPosAndEqDeletesWithDeletedColumn() throws IOException {
+    Schema dataSchema = table.schema().select("data");
+    Record dataDelete = GenericRecord.create(dataSchema);
+    List<Record> dataDeletes = Lists.newArrayList(
+        dataDelete.copy("data", "a"), // id = 29
+        dataDelete.copy("data", "d"), // id = 89
+        dataDelete.copy("data", "g") // id = 122
+    );
+
+    DeleteFile eqDeletes = FileHelpers.writeDeleteFile(
+        table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), 
dataDeletes, dataSchema);
+
+    List<Pair<CharSequence, Long>> deletes = Lists.newArrayList(
+        Pair.of(dataFile.path(), 3L), // id = 89
+        Pair.of(dataFile.path(), 5L) // id = 121
+    );
+
+    Pair<DeleteFile, CharSequenceSet> posDeletes = FileHelpers.writeDeleteFile(
+        table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), 
deletes);
+
+    table.newRowDelta()
+        .addDeletes(eqDeletes)
+        .addDeletes(posDeletes.first())
+        .validateDataFilesExist(posDeletes.second())
+        .commit();
+
+    StructLikeSet expected = expectedRowSet(29, 89, 121, 122);
+    StructLikeSet actual = rowSet(tableName, table, "id", "data", "_deleted");
+
+    validate(expected, actual);
+  }
+
+  @Test
+  public void testFilterOnDeletedMetadataColumn() throws IOException {
+    List<Pair<CharSequence, Long>> deletes = Lists.newArrayList(
+        Pair.of(dataFile.path(), 0L), // id = 29
+        Pair.of(dataFile.path(), 1L), // id = 43
+        Pair.of(dataFile.path(), 2L), // id = 61
+        Pair.of(dataFile.path(), 3L) // id = 89
+    );
+
+    Pair<DeleteFile, CharSequenceSet> posDeletes = FileHelpers.writeDeleteFile(
+        table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), 
deletes);
+
+    table.newRowDelta()
+        .addDeletes(posDeletes.first())
+        .validateDataFilesExist(posDeletes.second())
+        .commit();
+
+    // get undeleted rows
+    StructLikeSet expected = expectedRowSet(true, false, 29, 43, 61, 89);
+
+    Dataset<Row> df = spark.read()
+        .format("iceberg")
+        .load(TableIdentifier.of("default", tableName).toString())
+        .select("id", "data", "_deleted")
+        .filter("_deleted = false");
+
+    Types.StructType projection = table.schema().select("id", "data", 
"_deleted").asStruct();
+    StructLikeSet actual = StructLikeSet.create(projection);
+    df.collectAsList().forEach(row -> {
+      SparkStructLike rowWrapper = new SparkStructLike(projection);
+      actual.add(rowWrapper.wrap(row));
+    });
+
+    validate(expected, actual);
+
+    // get deleted rows
+    expected = expectedRowSet(false, true, 29, 43, 61, 89);
+
+    df = spark.read()
+        .format("iceberg")
+        .load(TableIdentifier.of("default", tableName).toString())
+        .select("id", "data", "_deleted")
+        .filter("_deleted = true");
+
+    StructLikeSet actualDeleted = StructLikeSet.create(projection);
+    df.collectAsList().forEach(row -> {
+      SparkStructLike rowWrapper = new SparkStructLike(projection);
+      actualDeleted.add(rowWrapper.wrap(row));
+    });
+
+    validate(expected, actualDeleted);
+  }
+
+  private static final Schema PROJECTION_SCHEMA = new Schema(
+      required(100, "id", Types.LongType.get()),
+      required(101, "data", Types.StringType.get()),
+      MetadataColumns.IS_DELETED
+  );
+
+  private void validate(StructLikeSet expected, StructLikeSet actual) {
+    Assert.assertEquals("Table should contain the same number of rows", 
expected.size(), actual.size());
+    for (StructLike expectedRow : expected) {
+      Assert.assertTrue("Table should contain expected row", 
actual.contains(expectedRow));
+    }
+  }
+
+  protected static StructLikeSet expectedRowSet(int... idsToRemove) {
+    return expectedRowSet(false, false, idsToRemove);
+  }
+
+  protected static StructLikeSet expectedRowSet(boolean removeDeleted, boolean 
removeUndeleted, int... idsToRemove) {

Review Comment:
   `removeUndeleted` was a little confusing name to me upon first read. Should 
we use a predicate to capture the filter intention for removeDelete or 
removeUndeleted? I guess the delete column set won't be able to be captured by 
predicate



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to