flyrain commented on code in PR #4683:
URL: https://github.com/apache/iceberg/pull/4683#discussion_r868259919


##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java:
##########
@@ -262,4 +271,191 @@ public void testPosDeletesAllRowsInBatch() throws 
IOException {
 
     Assert.assertEquals("Table should contain expected rows", expected, 
actual);
   }
+
+  @Test
+  public void testPosDeletesWithDeletedColumn() throws IOException {
+    // read.parquet.vectorization.batch-size is set to 4, so the 4 rows in the 
first batch are all deleted.
+    List<Pair<CharSequence, Long>> deletes = Lists.newArrayList(
+        Pair.of(dataFile.path(), 0L), // id = 29
+        Pair.of(dataFile.path(), 1L), // id = 43
+        Pair.of(dataFile.path(), 2L), // id = 61
+        Pair.of(dataFile.path(), 3L) // id = 89
+    );
+
+    Pair<DeleteFile, CharSequenceSet> posDeletes = FileHelpers.writeDeleteFile(
+        table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), 
deletes);
+
+    table.newRowDelta()
+        .addDeletes(posDeletes.first())
+        .validateDataFilesExist(posDeletes.second())
+        .commit();
+
+    StructLikeSet expected = expectedRowSet(29, 43, 61, 89);
+    StructLikeSet actual = rowSet(tableName, table, "id", "data", "_deleted");
+
+    validate(expected, actual);
+  }
+
+  @Test
+  public void testEqualityDeleteWithDeletedColumn() throws IOException {
+    String tableName = table.name().substring(table.name().lastIndexOf(".") + 
1);
+    Schema deleteRowSchema = table.schema().select("data");
+    Record dataDelete = GenericRecord.create(deleteRowSchema);
+    List<Record> dataDeletes = Lists.newArrayList(
+        dataDelete.copy("data", "a"), // id = 29
+        dataDelete.copy("data", "d"), // id = 89
+        dataDelete.copy("data", "g") // id = 122
+    );
+
+    DeleteFile eqDeletes = FileHelpers.writeDeleteFile(
+        table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), 
dataDeletes, deleteRowSchema);
+
+    table.newRowDelta()
+        .addDeletes(eqDeletes)
+        .commit();
+
+    StructLikeSet actual = rowSet(tableName, table, "id", "data", "_deleted");
+    validate(expectedRowSet(29, 89, 122), actual);
+  }
+
+  @Test
+  public void testMixedPosAndEqDeletesWithDeletedColumn() throws IOException {
+    Schema dataSchema = table.schema().select("data");
+    Record dataDelete = GenericRecord.create(dataSchema);
+    List<Record> dataDeletes = Lists.newArrayList(
+        dataDelete.copy("data", "a"), // id = 29
+        dataDelete.copy("data", "d"), // id = 89
+        dataDelete.copy("data", "g") // id = 122
+    );
+
+    DeleteFile eqDeletes = FileHelpers.writeDeleteFile(
+        table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), 
dataDeletes, dataSchema);
+
+    List<Pair<CharSequence, Long>> deletes = Lists.newArrayList(
+        Pair.of(dataFile.path(), 3L), // id = 89
+        Pair.of(dataFile.path(), 5L) // id = 121
+    );
+
+    Pair<DeleteFile, CharSequenceSet> posDeletes = FileHelpers.writeDeleteFile(
+        table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), 
deletes);
+
+    table.newRowDelta()
+        .addDeletes(eqDeletes)
+        .addDeletes(posDeletes.first())
+        .validateDataFilesExist(posDeletes.second())
+        .commit();
+
+    StructLikeSet expected = expectedRowSet(29, 89, 121, 122);
+    StructLikeSet actual = rowSet(tableName, table, "id", "data", "_deleted");
+
+    validate(expected, actual);
+  }
+
+  @Test
+  public void testFilterOnDeletedMetadataColumn() throws IOException {
+    List<Pair<CharSequence, Long>> deletes = Lists.newArrayList(
+        Pair.of(dataFile.path(), 0L), // id = 29
+        Pair.of(dataFile.path(), 1L), // id = 43
+        Pair.of(dataFile.path(), 2L), // id = 61
+        Pair.of(dataFile.path(), 3L) // id = 89
+    );
+
+    Pair<DeleteFile, CharSequenceSet> posDeletes = FileHelpers.writeDeleteFile(
+        table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), 
deletes);
+
+    table.newRowDelta()
+        .addDeletes(posDeletes.first())
+        .validateDataFilesExist(posDeletes.second())
+        .commit();
+
+    // get undeleted rows
+    StructLikeSet expected = expectedRowSet(true, false, 29, 43, 61, 89);
+
+    Dataset<Row> df = spark.read()
+        .format("iceberg")
+        .load(TableIdentifier.of("default", tableName).toString())
+        .select("id", "data", "_deleted")
+        .filter("_deleted = false");
+
+    Types.StructType projection = table.schema().select("id", "data", 
"_deleted").asStruct();
+    StructLikeSet actual = StructLikeSet.create(projection);
+    df.collectAsList().forEach(row -> {
+      SparkStructLike rowWrapper = new SparkStructLike(projection);
+      actual.add(rowWrapper.wrap(row));
+    });
+
+    validate(expected, actual);
+
+    // get deleted rows
+    expected = expectedRowSet(false, true, 29, 43, 61, 89);
+
+    df = spark.read()
+        .format("iceberg")
+        .load(TableIdentifier.of("default", tableName).toString())
+        .select("id", "data", "_deleted")
+        .filter("_deleted = true");
+
+    StructLikeSet actualDeleted = StructLikeSet.create(projection);
+    df.collectAsList().forEach(row -> {
+      SparkStructLike rowWrapper = new SparkStructLike(projection);
+      actualDeleted.add(rowWrapper.wrap(row));
+    });
+
+    validate(expected, actualDeleted);
+  }
+
+  private static final Schema PROJECTION_SCHEMA = new Schema(
+      required(100, "id", Types.LongType.get()),

Review Comment:
   It doesn't matter. I will use the same id.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to