flyrain commented on code in PR #4683:
URL: https://github.com/apache/iceberg/pull/4683#discussion_r883362351
##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java:
##########
@@ -262,4 +274,208 @@ public void testPosDeletesAllRowsInBatch() throws
IOException {
Assert.assertEquals("Table should contain expected rows", expected,
actual);
}
+
+ @Test
+ public void testPosDeletesWithDeletedColumn() throws IOException {
+ Assume.assumeFalse(vectorized);
+
+ // read.parquet.vectorization.batch-size is set to 4, so the 4 rows in the
first batch are all deleted.
+ List<Pair<CharSequence, Long>> deletes = Lists.newArrayList(
+ Pair.of(dataFile.path(), 0L), // id = 29
+ Pair.of(dataFile.path(), 1L), // id = 43
+ Pair.of(dataFile.path(), 2L), // id = 61
+ Pair.of(dataFile.path(), 3L) // id = 89
+ );
+
+ Pair<DeleteFile, CharSequenceSet> posDeletes = FileHelpers.writeDeleteFile(
+ table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0),
deletes);
+
+ table.newRowDelta()
+ .addDeletes(posDeletes.first())
+ .validateDataFilesExist(posDeletes.second())
+ .commit();
+
+ StructLikeSet expected = expectedRowSet(29, 43, 61, 89);
+ StructLikeSet actual = rowSet(tableName, PROJECTION_SCHEMA.asStruct(),
"id", "data", "_deleted");
+
+ Assert.assertEquals("Table should contain expected row", expected, actual);
+ }
+
+ @Test
+ public void testEqualityDeleteWithDeletedColumn() throws IOException {
+ Assume.assumeFalse(vectorized);
+
+ String tableName = table.name().substring(table.name().lastIndexOf(".") +
1);
+ Schema deleteRowSchema = table.schema().select("data");
+ Record dataDelete = GenericRecord.create(deleteRowSchema);
+ List<Record> dataDeletes = Lists.newArrayList(
+ dataDelete.copy("data", "a"), // id = 29
+ dataDelete.copy("data", "d"), // id = 89
+ dataDelete.copy("data", "g") // id = 122
+ );
+
+ DeleteFile eqDeletes = FileHelpers.writeDeleteFile(
+ table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0),
dataDeletes, deleteRowSchema);
+
+ table.newRowDelta()
+ .addDeletes(eqDeletes)
+ .commit();
+
+ StructLikeSet expected = expectedRowSet(29, 89, 122);
+ StructLikeSet actual = rowSet(tableName, PROJECTION_SCHEMA.asStruct(),
"id", "data", "_deleted");
+
+ Assert.assertEquals("Table should contain expected row", expected, actual);
+ }
+
+ @Test
+ public void testMixedPosAndEqDeletesWithDeletedColumn() throws IOException {
+ Assume.assumeFalse(vectorized);
+
+ Schema dataSchema = table.schema().select("data");
+ Record dataDelete = GenericRecord.create(dataSchema);
+ List<Record> dataDeletes = Lists.newArrayList(
+ dataDelete.copy("data", "a"), // id = 29
+ dataDelete.copy("data", "d"), // id = 89
+ dataDelete.copy("data", "g") // id = 122
+ );
+
+ DeleteFile eqDeletes = FileHelpers.writeDeleteFile(
+ table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0),
dataDeletes, dataSchema);
+
+ List<Pair<CharSequence, Long>> deletes = Lists.newArrayList(
+ Pair.of(dataFile.path(), 3L), // id = 89
+ Pair.of(dataFile.path(), 5L) // id = 121
+ );
+
+ Pair<DeleteFile, CharSequenceSet> posDeletes = FileHelpers.writeDeleteFile(
+ table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0),
deletes);
+
+ table.newRowDelta()
+ .addDeletes(eqDeletes)
+ .addDeletes(posDeletes.first())
+ .validateDataFilesExist(posDeletes.second())
+ .commit();
+
+ StructLikeSet expected = expectedRowSet(29, 89, 121, 122);
+ StructLikeSet actual = rowSet(tableName, PROJECTION_SCHEMA.asStruct(),
"id", "data", "_deleted");
+
+ Assert.assertEquals("Table should contain expected row", expected, actual);
+ }
+
+ @Test
+ public void testFilterOnDeletedMetadataColumn() throws IOException {
+ Assume.assumeFalse(vectorized);
+
+ List<Pair<CharSequence, Long>> deletes = Lists.newArrayList(
+ Pair.of(dataFile.path(), 0L), // id = 29
+ Pair.of(dataFile.path(), 1L), // id = 43
+ Pair.of(dataFile.path(), 2L), // id = 61
+ Pair.of(dataFile.path(), 3L) // id = 89
+ );
+
+ Pair<DeleteFile, CharSequenceSet> posDeletes = FileHelpers.writeDeleteFile(
+ table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0),
deletes);
+
+ table.newRowDelta()
+ .addDeletes(posDeletes.first())
+ .validateDataFilesExist(posDeletes.second())
+ .commit();
+
+ StructLikeSet expected = expectedRowSetWithNonDeletesOnly(29, 43, 61, 89);
+
+ // get non-deleted rows
+ Dataset<Row> df = spark.read()
+ .format("iceberg")
+ .load(TableIdentifier.of("default", tableName).toString())
+ .select("id", "data", "_deleted")
+ .filter("_deleted = false");
+
+ Types.StructType projection = PROJECTION_SCHEMA.asStruct();
+ StructLikeSet actual = StructLikeSet.create(projection);
+ df.collectAsList().forEach(row -> {
+ SparkStructLike rowWrapper = new SparkStructLike(projection);
+ actual.add(rowWrapper.wrap(row));
+ });
+
+ Assert.assertEquals("Table should contain expected row", expected, actual);
+
+ StructLikeSet expectedDeleted = expectedRowSetWithDeletesOnly(29, 43, 61,
89);
+
+ // get deleted rows
+ df = spark.read()
+ .format("iceberg")
+ .load(TableIdentifier.of("default", tableName).toString())
+ .select("id", "data", "_deleted")
+ .filter("_deleted = true");
+
+ StructLikeSet actualDeleted = StructLikeSet.create(projection);
+ df.collectAsList().forEach(row -> {
+ SparkStructLike rowWrapper = new SparkStructLike(projection);
+ actualDeleted.add(rowWrapper.wrap(row));
+ });
+
+ Assert.assertEquals("Table should contain expected row", expectedDeleted,
actualDeleted);
+ }
+
+ @Test
+ public void testIsDeletedColumnWithoutDeleteFile() {
Review Comment:
Add a test to project is_deleted column when there is no delete file.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]