This is an automated email from the ASF dual-hosted git repository.

russellspitzer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 7ebb241232 Core, Spark: Correct the delete record count for 
PartitionTable (#9389)
7ebb241232 is described below

commit 7ebb241232e7a88f30257fa657f44cfe268c8a95
Author: Xianyang Liu <[email protected]>
AuthorDate: Wed Jan 3 00:24:32 2024 +0800

    Core, Spark: Correct the delete record count for PartitionTable (#9389)
---
 .../java/org/apache/iceberg/PartitionsTable.java   |  4 +-
 .../spark/source/TestIcebergSourceTablesBase.java  | 47 +++++++++++++---------
 .../spark/source/TestIcebergSourceTablesBase.java  | 46 +++++++++++++--------
 .../spark/source/TestIcebergSourceTablesBase.java  | 46 +++++++++++++--------
 4 files changed, 89 insertions(+), 54 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/PartitionsTable.java 
b/core/src/main/java/org/apache/iceberg/PartitionsTable.java
index 2537f5172b..5ff796e958 100644
--- a/core/src/main/java/org/apache/iceberg/PartitionsTable.java
+++ b/core/src/main/java/org/apache/iceberg/PartitionsTable.java
@@ -314,12 +314,12 @@ public class PartitionsTable extends BaseMetadataTable {
           this.dataFileSizeInBytes += file.fileSizeInBytes();
           break;
         case POSITION_DELETES:
-          this.posDeleteRecordCount = file.recordCount();
+          this.posDeleteRecordCount += file.recordCount();
           this.posDeleteFileCount += 1;
           this.specId = file.specId();
           break;
         case EQUALITY_DELETES:
-          this.eqDeleteRecordCount = file.recordCount();
+          this.eqDeleteRecordCount += file.recordCount();
           this.eqDeleteFileCount += 1;
           this.specId = file.specId();
           break;
diff --git 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
index 143488176f..3c52652748 100644
--- 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
+++ 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
@@ -1602,9 +1602,15 @@ public abstract class TestIcebergSourceTablesBase 
extends SparkTestBase {
     Table table = createTable(tableIdentifier, SCHEMA, SPEC);
     Table partitionsTable = loadTable(tableIdentifier, "partitions");
     Dataset<Row> df1 =
-        spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), 
SimpleRecord.class);
+        spark.createDataFrame(
+            Lists.newArrayList(
+                new SimpleRecord(1, "a"), new SimpleRecord(1, "b"), new 
SimpleRecord(1, "c")),
+            SimpleRecord.class);
     Dataset<Row> df2 =
-        spark.createDataFrame(Lists.newArrayList(new SimpleRecord(2, "b")), 
SimpleRecord.class);
+        spark.createDataFrame(
+            Lists.newArrayList(
+                new SimpleRecord(2, "d"), new SimpleRecord(2, "e"), new 
SimpleRecord(2, "f")),
+            SimpleRecord.class);
 
     df1.select("id", "data")
         .write()
@@ -1624,8 +1630,9 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
 
     // test position deletes
     table.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit();
-    DeleteFile deleteFile = writePosDeleteFile(table);
-    table.newRowDelta().addDeletes(deleteFile).commit();
+    DeleteFile deleteFile1 = writePosDeleteFile(table, 0);
+    DeleteFile deleteFile2 = writePosDeleteFile(table, 1);
+    
table.newRowDelta().addDeletes(deleteFile1).addDeletes(deleteFile2).commit();
     table.refresh();
     long posDeleteCommitId = table.currentSnapshot().snapshotId();
 
@@ -1648,7 +1655,7 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
     expected.add(
         builder
             .set("partition", partitionBuilder.set("id", 1).build())
-            .set("record_count", 1L)
+            .set("record_count", 3L)
             .set("file_count", 1)
             .set(
                 "total_data_file_size_in_bytes",
@@ -1664,13 +1671,13 @@ public abstract class TestIcebergSourceTablesBase 
extends SparkTestBase {
     expected.add(
         builder
             .set("partition", partitionBuilder.set("id", 2).build())
-            .set("record_count", 1L)
+            .set("record_count", 3L)
             .set("file_count", 1)
             .set(
                 "total_data_file_size_in_bytes",
                 
totalSizeInBytes(table.snapshot(firstCommitId).addedDataFiles(table.io())))
-            .set("position_delete_record_count", 1L) // should be incremented 
now
-            .set("position_delete_file_count", 1) // should be incremented now
+            .set("position_delete_record_count", 2L) // should be incremented 
now
+            .set("position_delete_file_count", 2) // should be incremented now
             .set("equality_delete_record_count", 0L)
             .set("equality_delete_file_count", 0)
             .set("spec_id", 0)
@@ -1684,8 +1691,9 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
     }
 
     // test equality delete
-    DeleteFile eqDeleteFile = writeEqDeleteFile(table);
-    table.newRowDelta().addDeletes(eqDeleteFile).commit();
+    DeleteFile eqDeleteFile1 = writeEqDeleteFile(table, "d");
+    DeleteFile eqDeleteFile2 = writeEqDeleteFile(table, "f");
+    
table.newRowDelta().addDeletes(eqDeleteFile1).addDeletes(eqDeleteFile2).commit();
     table.refresh();
     long eqDeleteCommitId = table.currentSnapshot().snapshotId();
     actual =
@@ -1701,13 +1709,12 @@ public abstract class TestIcebergSourceTablesBase 
extends SparkTestBase {
         0,
         builder
             .set("partition", partitionBuilder.set("id", 1).build())
-            .set("record_count", 1L)
+            .set("record_count", 3L)
             .set("file_count", 1)
             .set("position_delete_record_count", 0L)
             .set("position_delete_file_count", 0)
-            .set("equality_delete_record_count", 1L) // should be incremented 
now
-            .set("equality_delete_file_count", 1) // should be incremented now
-            .set("spec_id", 0)
+            .set("equality_delete_record_count", 2L) // should be incremented 
now
+            .set("equality_delete_file_count", 2) // should be incremented now
             .set("last_updated_at", 
table.snapshot(eqDeleteCommitId).timestampMillis() * 1000)
             .set("last_updated_snapshot_id", eqDeleteCommitId)
             .build());
@@ -2240,22 +2247,26 @@ public abstract class TestIcebergSourceTablesBase 
extends SparkTestBase {
   }
 
   private DeleteFile writePosDeleteFile(Table table) {
+    return writePosDeleteFile(table, 0L);
+  }
+
+  private DeleteFile writePosDeleteFile(Table table, long pos) {
     DataFile dataFile =
         Iterables.getFirst(table.currentSnapshot().addedDataFiles(table.io()), 
null);
     PartitionSpec dataFileSpec = table.specs().get(dataFile.specId());
     StructLike dataFilePartition = dataFile.partition();
 
     PositionDelete<InternalRow> delete = PositionDelete.create();
-    delete.set(dataFile.path(), 0L, null);
+    delete.set(dataFile.path(), pos, null);
 
     return writePositionDeletes(table, dataFileSpec, dataFilePartition, 
ImmutableList.of(delete));
   }
 
-  private DeleteFile writeEqDeleteFile(Table table) {
+  private DeleteFile writeEqDeleteFile(Table table, String dataValue) {
     List<Record> deletes = Lists.newArrayList();
-    Schema deleteRowSchema = SCHEMA.select("id");
+    Schema deleteRowSchema = SCHEMA.select("data");
     Record delete = GenericRecord.create(deleteRowSchema);
-    deletes.add(delete.copy("id", 1));
+    deletes.add(delete.copy("data", dataValue));
     try {
       return FileHelpers.writeDeleteFile(
           table,
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
index 486713e52e..ebd0933a95 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
@@ -1600,9 +1600,15 @@ public abstract class TestIcebergSourceTablesBase 
extends SparkTestBase {
     Table table = createTable(tableIdentifier, SCHEMA, SPEC);
     Table partitionsTable = loadTable(tableIdentifier, "partitions");
     Dataset<Row> df1 =
-        spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), 
SimpleRecord.class);
+        spark.createDataFrame(
+            Lists.newArrayList(
+                new SimpleRecord(1, "a"), new SimpleRecord(1, "b"), new 
SimpleRecord(1, "c")),
+            SimpleRecord.class);
     Dataset<Row> df2 =
-        spark.createDataFrame(Lists.newArrayList(new SimpleRecord(2, "b")), 
SimpleRecord.class);
+        spark.createDataFrame(
+            Lists.newArrayList(
+                new SimpleRecord(2, "d"), new SimpleRecord(2, "e"), new 
SimpleRecord(2, "f")),
+            SimpleRecord.class);
 
     df1.select("id", "data")
         .write()
@@ -1622,8 +1628,9 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
 
     // test position deletes
     table.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit();
-    DeleteFile deleteFile = writePosDeleteFile(table);
-    table.newRowDelta().addDeletes(deleteFile).commit();
+    DeleteFile deleteFile1 = writePosDeleteFile(table, 0);
+    DeleteFile deleteFile2 = writePosDeleteFile(table, 1);
+    
table.newRowDelta().addDeletes(deleteFile1).addDeletes(deleteFile2).commit();
     table.refresh();
     long posDeleteCommitId = table.currentSnapshot().snapshotId();
 
@@ -1646,7 +1653,7 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
     expected.add(
         builder
             .set("partition", partitionBuilder.set("id", 1).build())
-            .set("record_count", 1L)
+            .set("record_count", 3L)
             .set("file_count", 1)
             .set(
                 "total_data_file_size_in_bytes",
@@ -1662,13 +1669,13 @@ public abstract class TestIcebergSourceTablesBase 
extends SparkTestBase {
     expected.add(
         builder
             .set("partition", partitionBuilder.set("id", 2).build())
-            .set("record_count", 1L)
+            .set("record_count", 3L)
             .set("file_count", 1)
             .set(
                 "total_data_file_size_in_bytes",
                 
totalSizeInBytes(table.snapshot(firstCommitId).addedDataFiles(table.io())))
-            .set("position_delete_record_count", 1L) // should be incremented 
now
-            .set("position_delete_file_count", 1) // should be incremented now
+            .set("position_delete_record_count", 2L) // should be incremented 
now
+            .set("position_delete_file_count", 2) // should be incremented now
             .set("equality_delete_record_count", 0L)
             .set("equality_delete_file_count", 0)
             .set("spec_id", 0)
@@ -1682,8 +1689,9 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
     }
 
     // test equality delete
-    DeleteFile eqDeleteFile = writeEqDeleteFile(table);
-    table.newRowDelta().addDeletes(eqDeleteFile).commit();
+    DeleteFile eqDeleteFile1 = writeEqDeleteFile(table, "d");
+    DeleteFile eqDeleteFile2 = writeEqDeleteFile(table, "f");
+    
table.newRowDelta().addDeletes(eqDeleteFile1).addDeletes(eqDeleteFile2).commit();
     table.refresh();
     long eqDeleteCommitId = table.currentSnapshot().snapshotId();
     actual =
@@ -1699,12 +1707,12 @@ public abstract class TestIcebergSourceTablesBase 
extends SparkTestBase {
         0,
         builder
             .set("partition", partitionBuilder.set("id", 1).build())
-            .set("record_count", 1L)
+            .set("record_count", 3L)
             .set("file_count", 1)
             .set("position_delete_record_count", 0L)
             .set("position_delete_file_count", 0)
-            .set("equality_delete_record_count", 1L) // should be incremented 
now
-            .set("equality_delete_file_count", 1) // should be incremented now
+            .set("equality_delete_record_count", 2L) // should be incremented 
now
+            .set("equality_delete_file_count", 2) // should be incremented now
             .set("last_updated_at", 
table.snapshot(eqDeleteCommitId).timestampMillis() * 1000)
             .set("last_updated_snapshot_id", eqDeleteCommitId)
             .build());
@@ -2237,22 +2245,26 @@ public abstract class TestIcebergSourceTablesBase 
extends SparkTestBase {
   }
 
   private DeleteFile writePosDeleteFile(Table table) {
+    return writePosDeleteFile(table, 0L);
+  }
+
+  private DeleteFile writePosDeleteFile(Table table, long pos) {
     DataFile dataFile =
         Iterables.getFirst(table.currentSnapshot().addedDataFiles(table.io()), 
null);
     PartitionSpec dataFileSpec = table.specs().get(dataFile.specId());
     StructLike dataFilePartition = dataFile.partition();
 
     PositionDelete<InternalRow> delete = PositionDelete.create();
-    delete.set(dataFile.path(), 0L, null);
+    delete.set(dataFile.path(), pos, null);
 
     return writePositionDeletes(table, dataFileSpec, dataFilePartition, 
ImmutableList.of(delete));
   }
 
-  private DeleteFile writeEqDeleteFile(Table table) {
+  private DeleteFile writeEqDeleteFile(Table table, String dataValue) {
     List<Record> deletes = Lists.newArrayList();
-    Schema deleteRowSchema = SCHEMA.select("id");
+    Schema deleteRowSchema = SCHEMA.select("data");
     Record delete = GenericRecord.create(deleteRowSchema);
-    deletes.add(delete.copy("id", 1));
+    deletes.add(delete.copy("data", dataValue));
     try {
       return FileHelpers.writeDeleteFile(
           table,
diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
index 29ccba5a27..4f585eee51 100644
--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
+++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
@@ -1602,9 +1602,15 @@ public abstract class TestIcebergSourceTablesBase 
extends TestBase {
     Table table = createTable(tableIdentifier, SCHEMA, SPEC);
     Table partitionsTable = loadTable(tableIdentifier, "partitions");
     Dataset<Row> df1 =
-        spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), 
SimpleRecord.class);
+        spark.createDataFrame(
+            Lists.newArrayList(
+                new SimpleRecord(1, "a"), new SimpleRecord(1, "b"), new 
SimpleRecord(1, "c")),
+            SimpleRecord.class);
     Dataset<Row> df2 =
-        spark.createDataFrame(Lists.newArrayList(new SimpleRecord(2, "b")), 
SimpleRecord.class);
+        spark.createDataFrame(
+            Lists.newArrayList(
+                new SimpleRecord(2, "d"), new SimpleRecord(2, "e"), new 
SimpleRecord(2, "f")),
+            SimpleRecord.class);
 
     df1.select("id", "data")
         .write()
@@ -1624,8 +1630,9 @@ public abstract class TestIcebergSourceTablesBase extends 
TestBase {
 
     // test position deletes
     table.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit();
-    DeleteFile deleteFile = writePosDeleteFile(table);
-    table.newRowDelta().addDeletes(deleteFile).commit();
+    DeleteFile deleteFile1 = writePosDeleteFile(table, 0);
+    DeleteFile deleteFile2 = writePosDeleteFile(table, 1);
+    
table.newRowDelta().addDeletes(deleteFile1).addDeletes(deleteFile2).commit();
     table.refresh();
     long posDeleteCommitId = table.currentSnapshot().snapshotId();
 
@@ -1648,7 +1655,7 @@ public abstract class TestIcebergSourceTablesBase extends 
TestBase {
     expected.add(
         builder
             .set("partition", partitionBuilder.set("id", 1).build())
-            .set("record_count", 1L)
+            .set("record_count", 3L)
             .set("file_count", 1)
             .set(
                 "total_data_file_size_in_bytes",
@@ -1664,13 +1671,13 @@ public abstract class TestIcebergSourceTablesBase 
extends TestBase {
     expected.add(
         builder
             .set("partition", partitionBuilder.set("id", 2).build())
-            .set("record_count", 1L)
+            .set("record_count", 3L)
             .set("file_count", 1)
             .set(
                 "total_data_file_size_in_bytes",
                 
totalSizeInBytes(table.snapshot(firstCommitId).addedDataFiles(table.io())))
-            .set("position_delete_record_count", 1L) // should be incremented 
now
-            .set("position_delete_file_count", 1) // should be incremented now
+            .set("position_delete_record_count", 2L) // should be incremented 
now
+            .set("position_delete_file_count", 2) // should be incremented now
             .set("equality_delete_record_count", 0L)
             .set("equality_delete_file_count", 0)
             .set("spec_id", 0)
@@ -1684,8 +1691,9 @@ public abstract class TestIcebergSourceTablesBase extends 
TestBase {
     }
 
     // test equality delete
-    DeleteFile eqDeleteFile = writeEqDeleteFile(table);
-    table.newRowDelta().addDeletes(eqDeleteFile).commit();
+    DeleteFile eqDeleteFile1 = writeEqDeleteFile(table, "d");
+    DeleteFile eqDeleteFile2 = writeEqDeleteFile(table, "f");
+    
table.newRowDelta().addDeletes(eqDeleteFile1).addDeletes(eqDeleteFile2).commit();
     table.refresh();
     long eqDeleteCommitId = table.currentSnapshot().snapshotId();
     actual =
@@ -1701,12 +1709,12 @@ public abstract class TestIcebergSourceTablesBase 
extends TestBase {
         0,
         builder
             .set("partition", partitionBuilder.set("id", 1).build())
-            .set("record_count", 1L)
+            .set("record_count", 3L)
             .set("file_count", 1)
             .set("position_delete_record_count", 0L)
             .set("position_delete_file_count", 0)
-            .set("equality_delete_record_count", 1L) // should be incremented 
now
-            .set("equality_delete_file_count", 1) // should be incremented now
+            .set("equality_delete_record_count", 2L) // should be incremented 
now
+            .set("equality_delete_file_count", 2) // should be incremented now
             .set("last_updated_at", 
table.snapshot(eqDeleteCommitId).timestampMillis() * 1000)
             .set("last_updated_snapshot_id", eqDeleteCommitId)
             .build());
@@ -2262,22 +2270,26 @@ public abstract class TestIcebergSourceTablesBase 
extends TestBase {
   }
 
   private DeleteFile writePosDeleteFile(Table table) {
+    return writePosDeleteFile(table, 0L);
+  }
+
+  private DeleteFile writePosDeleteFile(Table table, long pos) {
     DataFile dataFile =
         Iterables.getFirst(table.currentSnapshot().addedDataFiles(table.io()), 
null);
     PartitionSpec dataFileSpec = table.specs().get(dataFile.specId());
     StructLike dataFilePartition = dataFile.partition();
 
     PositionDelete<InternalRow> delete = PositionDelete.create();
-    delete.set(dataFile.path(), 0L, null);
+    delete.set(dataFile.path(), pos, null);
 
     return writePositionDeletes(table, dataFileSpec, dataFilePartition, 
ImmutableList.of(delete));
   }
 
-  private DeleteFile writeEqDeleteFile(Table table) {
+  private DeleteFile writeEqDeleteFile(Table table, String dataValue) {
     List<Record> deletes = Lists.newArrayList();
-    Schema deleteRowSchema = SCHEMA.select("id");
+    Schema deleteRowSchema = SCHEMA.select("data");
     Record delete = GenericRecord.create(deleteRowSchema);
-    deletes.add(delete.copy("id", 1));
+    deletes.add(delete.copy("data", dataValue));
     try {
       return FileHelpers.writeDeleteFile(
           table,

Reply via email to