This is an automated email from the ASF dual-hosted git repository.
russellspitzer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 7ebb241232 Core, Spark: Correct the delete record count for
PartitionTable (#9389)
7ebb241232 is described below
commit 7ebb241232e7a88f30257fa657f44cfe268c8a95
Author: Xianyang Liu <[email protected]>
AuthorDate: Wed Jan 3 00:24:32 2024 +0800
Core, Spark: Correct the delete record count for PartitionTable (#9389)
---
.../java/org/apache/iceberg/PartitionsTable.java | 4 +-
.../spark/source/TestIcebergSourceTablesBase.java | 47 +++++++++++++---------
.../spark/source/TestIcebergSourceTablesBase.java | 46 +++++++++++++--------
.../spark/source/TestIcebergSourceTablesBase.java | 46 +++++++++++++--------
4 files changed, 89 insertions(+), 54 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/PartitionsTable.java
b/core/src/main/java/org/apache/iceberg/PartitionsTable.java
index 2537f5172b..5ff796e958 100644
--- a/core/src/main/java/org/apache/iceberg/PartitionsTable.java
+++ b/core/src/main/java/org/apache/iceberg/PartitionsTable.java
@@ -314,12 +314,12 @@ public class PartitionsTable extends BaseMetadataTable {
this.dataFileSizeInBytes += file.fileSizeInBytes();
break;
case POSITION_DELETES:
- this.posDeleteRecordCount = file.recordCount();
+ this.posDeleteRecordCount += file.recordCount();
this.posDeleteFileCount += 1;
this.specId = file.specId();
break;
case EQUALITY_DELETES:
- this.eqDeleteRecordCount = file.recordCount();
+ this.eqDeleteRecordCount += file.recordCount();
this.eqDeleteFileCount += 1;
this.specId = file.specId();
break;
diff --git
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
index 143488176f..3c52652748 100644
---
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
+++
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
@@ -1602,9 +1602,15 @@ public abstract class TestIcebergSourceTablesBase
extends SparkTestBase {
Table table = createTable(tableIdentifier, SCHEMA, SPEC);
Table partitionsTable = loadTable(tableIdentifier, "partitions");
Dataset<Row> df1 =
- spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")),
SimpleRecord.class);
+ spark.createDataFrame(
+ Lists.newArrayList(
+ new SimpleRecord(1, "a"), new SimpleRecord(1, "b"), new
SimpleRecord(1, "c")),
+ SimpleRecord.class);
Dataset<Row> df2 =
- spark.createDataFrame(Lists.newArrayList(new SimpleRecord(2, "b")),
SimpleRecord.class);
+ spark.createDataFrame(
+ Lists.newArrayList(
+ new SimpleRecord(2, "d"), new SimpleRecord(2, "e"), new
SimpleRecord(2, "f")),
+ SimpleRecord.class);
df1.select("id", "data")
.write()
@@ -1624,8 +1630,9 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
// test position deletes
table.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit();
- DeleteFile deleteFile = writePosDeleteFile(table);
- table.newRowDelta().addDeletes(deleteFile).commit();
+ DeleteFile deleteFile1 = writePosDeleteFile(table, 0);
+ DeleteFile deleteFile2 = writePosDeleteFile(table, 1);
+
table.newRowDelta().addDeletes(deleteFile1).addDeletes(deleteFile2).commit();
table.refresh();
long posDeleteCommitId = table.currentSnapshot().snapshotId();
@@ -1648,7 +1655,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
expected.add(
builder
.set("partition", partitionBuilder.set("id", 1).build())
- .set("record_count", 1L)
+ .set("record_count", 3L)
.set("file_count", 1)
.set(
"total_data_file_size_in_bytes",
@@ -1664,13 +1671,13 @@ public abstract class TestIcebergSourceTablesBase
extends SparkTestBase {
expected.add(
builder
.set("partition", partitionBuilder.set("id", 2).build())
- .set("record_count", 1L)
+ .set("record_count", 3L)
.set("file_count", 1)
.set(
"total_data_file_size_in_bytes",
totalSizeInBytes(table.snapshot(firstCommitId).addedDataFiles(table.io())))
- .set("position_delete_record_count", 1L) // should be incremented
now
- .set("position_delete_file_count", 1) // should be incremented now
+ .set("position_delete_record_count", 2L) // should be incremented
now
+ .set("position_delete_file_count", 2) // should be incremented now
.set("equality_delete_record_count", 0L)
.set("equality_delete_file_count", 0)
.set("spec_id", 0)
@@ -1684,8 +1691,9 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
}
// test equality delete
- DeleteFile eqDeleteFile = writeEqDeleteFile(table);
- table.newRowDelta().addDeletes(eqDeleteFile).commit();
+ DeleteFile eqDeleteFile1 = writeEqDeleteFile(table, "d");
+ DeleteFile eqDeleteFile2 = writeEqDeleteFile(table, "f");
+
table.newRowDelta().addDeletes(eqDeleteFile1).addDeletes(eqDeleteFile2).commit();
table.refresh();
long eqDeleteCommitId = table.currentSnapshot().snapshotId();
actual =
@@ -1701,13 +1709,12 @@ public abstract class TestIcebergSourceTablesBase
extends SparkTestBase {
0,
builder
.set("partition", partitionBuilder.set("id", 1).build())
- .set("record_count", 1L)
+ .set("record_count", 3L)
.set("file_count", 1)
.set("position_delete_record_count", 0L)
.set("position_delete_file_count", 0)
- .set("equality_delete_record_count", 1L) // should be incremented
now
- .set("equality_delete_file_count", 1) // should be incremented now
- .set("spec_id", 0)
+ .set("equality_delete_record_count", 2L) // should be incremented
now
+ .set("equality_delete_file_count", 2) // should be incremented now
.set("last_updated_at",
table.snapshot(eqDeleteCommitId).timestampMillis() * 1000)
.set("last_updated_snapshot_id", eqDeleteCommitId)
.build());
@@ -2240,22 +2247,26 @@ public abstract class TestIcebergSourceTablesBase
extends SparkTestBase {
}
private DeleteFile writePosDeleteFile(Table table) {
+ return writePosDeleteFile(table, 0L);
+ }
+
+ private DeleteFile writePosDeleteFile(Table table, long pos) {
DataFile dataFile =
Iterables.getFirst(table.currentSnapshot().addedDataFiles(table.io()),
null);
PartitionSpec dataFileSpec = table.specs().get(dataFile.specId());
StructLike dataFilePartition = dataFile.partition();
PositionDelete<InternalRow> delete = PositionDelete.create();
- delete.set(dataFile.path(), 0L, null);
+ delete.set(dataFile.path(), pos, null);
return writePositionDeletes(table, dataFileSpec, dataFilePartition,
ImmutableList.of(delete));
}
- private DeleteFile writeEqDeleteFile(Table table) {
+ private DeleteFile writeEqDeleteFile(Table table, String dataValue) {
List<Record> deletes = Lists.newArrayList();
- Schema deleteRowSchema = SCHEMA.select("id");
+ Schema deleteRowSchema = SCHEMA.select("data");
Record delete = GenericRecord.create(deleteRowSchema);
- deletes.add(delete.copy("id", 1));
+ deletes.add(delete.copy("data", dataValue));
try {
return FileHelpers.writeDeleteFile(
table,
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
index 486713e52e..ebd0933a95 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
@@ -1600,9 +1600,15 @@ public abstract class TestIcebergSourceTablesBase
extends SparkTestBase {
Table table = createTable(tableIdentifier, SCHEMA, SPEC);
Table partitionsTable = loadTable(tableIdentifier, "partitions");
Dataset<Row> df1 =
- spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")),
SimpleRecord.class);
+ spark.createDataFrame(
+ Lists.newArrayList(
+ new SimpleRecord(1, "a"), new SimpleRecord(1, "b"), new
SimpleRecord(1, "c")),
+ SimpleRecord.class);
Dataset<Row> df2 =
- spark.createDataFrame(Lists.newArrayList(new SimpleRecord(2, "b")),
SimpleRecord.class);
+ spark.createDataFrame(
+ Lists.newArrayList(
+ new SimpleRecord(2, "d"), new SimpleRecord(2, "e"), new
SimpleRecord(2, "f")),
+ SimpleRecord.class);
df1.select("id", "data")
.write()
@@ -1622,8 +1628,9 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
// test position deletes
table.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit();
- DeleteFile deleteFile = writePosDeleteFile(table);
- table.newRowDelta().addDeletes(deleteFile).commit();
+ DeleteFile deleteFile1 = writePosDeleteFile(table, 0);
+ DeleteFile deleteFile2 = writePosDeleteFile(table, 1);
+
table.newRowDelta().addDeletes(deleteFile1).addDeletes(deleteFile2).commit();
table.refresh();
long posDeleteCommitId = table.currentSnapshot().snapshotId();
@@ -1646,7 +1653,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
expected.add(
builder
.set("partition", partitionBuilder.set("id", 1).build())
- .set("record_count", 1L)
+ .set("record_count", 3L)
.set("file_count", 1)
.set(
"total_data_file_size_in_bytes",
@@ -1662,13 +1669,13 @@ public abstract class TestIcebergSourceTablesBase
extends SparkTestBase {
expected.add(
builder
.set("partition", partitionBuilder.set("id", 2).build())
- .set("record_count", 1L)
+ .set("record_count", 3L)
.set("file_count", 1)
.set(
"total_data_file_size_in_bytes",
totalSizeInBytes(table.snapshot(firstCommitId).addedDataFiles(table.io())))
- .set("position_delete_record_count", 1L) // should be incremented
now
- .set("position_delete_file_count", 1) // should be incremented now
+ .set("position_delete_record_count", 2L) // should be incremented
now
+ .set("position_delete_file_count", 2) // should be incremented now
.set("equality_delete_record_count", 0L)
.set("equality_delete_file_count", 0)
.set("spec_id", 0)
@@ -1682,8 +1689,9 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
}
// test equality delete
- DeleteFile eqDeleteFile = writeEqDeleteFile(table);
- table.newRowDelta().addDeletes(eqDeleteFile).commit();
+ DeleteFile eqDeleteFile1 = writeEqDeleteFile(table, "d");
+ DeleteFile eqDeleteFile2 = writeEqDeleteFile(table, "f");
+
table.newRowDelta().addDeletes(eqDeleteFile1).addDeletes(eqDeleteFile2).commit();
table.refresh();
long eqDeleteCommitId = table.currentSnapshot().snapshotId();
actual =
@@ -1699,12 +1707,12 @@ public abstract class TestIcebergSourceTablesBase
extends SparkTestBase {
0,
builder
.set("partition", partitionBuilder.set("id", 1).build())
- .set("record_count", 1L)
+ .set("record_count", 3L)
.set("file_count", 1)
.set("position_delete_record_count", 0L)
.set("position_delete_file_count", 0)
- .set("equality_delete_record_count", 1L) // should be incremented
now
- .set("equality_delete_file_count", 1) // should be incremented now
+ .set("equality_delete_record_count", 2L) // should be incremented
now
+ .set("equality_delete_file_count", 2) // should be incremented now
.set("last_updated_at",
table.snapshot(eqDeleteCommitId).timestampMillis() * 1000)
.set("last_updated_snapshot_id", eqDeleteCommitId)
.build());
@@ -2237,22 +2245,26 @@ public abstract class TestIcebergSourceTablesBase
extends SparkTestBase {
}
private DeleteFile writePosDeleteFile(Table table) {
+ return writePosDeleteFile(table, 0L);
+ }
+
+ private DeleteFile writePosDeleteFile(Table table, long pos) {
DataFile dataFile =
Iterables.getFirst(table.currentSnapshot().addedDataFiles(table.io()),
null);
PartitionSpec dataFileSpec = table.specs().get(dataFile.specId());
StructLike dataFilePartition = dataFile.partition();
PositionDelete<InternalRow> delete = PositionDelete.create();
- delete.set(dataFile.path(), 0L, null);
+ delete.set(dataFile.path(), pos, null);
return writePositionDeletes(table, dataFileSpec, dataFilePartition,
ImmutableList.of(delete));
}
- private DeleteFile writeEqDeleteFile(Table table) {
+ private DeleteFile writeEqDeleteFile(Table table, String dataValue) {
List<Record> deletes = Lists.newArrayList();
- Schema deleteRowSchema = SCHEMA.select("id");
+ Schema deleteRowSchema = SCHEMA.select("data");
Record delete = GenericRecord.create(deleteRowSchema);
- deletes.add(delete.copy("id", 1));
+ deletes.add(delete.copy("data", dataValue));
try {
return FileHelpers.writeDeleteFile(
table,
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
index 29ccba5a27..4f585eee51 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
@@ -1602,9 +1602,15 @@ public abstract class TestIcebergSourceTablesBase
extends TestBase {
Table table = createTable(tableIdentifier, SCHEMA, SPEC);
Table partitionsTable = loadTable(tableIdentifier, "partitions");
Dataset<Row> df1 =
- spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")),
SimpleRecord.class);
+ spark.createDataFrame(
+ Lists.newArrayList(
+ new SimpleRecord(1, "a"), new SimpleRecord(1, "b"), new
SimpleRecord(1, "c")),
+ SimpleRecord.class);
Dataset<Row> df2 =
- spark.createDataFrame(Lists.newArrayList(new SimpleRecord(2, "b")),
SimpleRecord.class);
+ spark.createDataFrame(
+ Lists.newArrayList(
+ new SimpleRecord(2, "d"), new SimpleRecord(2, "e"), new
SimpleRecord(2, "f")),
+ SimpleRecord.class);
df1.select("id", "data")
.write()
@@ -1624,8 +1630,9 @@ public abstract class TestIcebergSourceTablesBase extends
TestBase {
// test position deletes
table.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit();
- DeleteFile deleteFile = writePosDeleteFile(table);
- table.newRowDelta().addDeletes(deleteFile).commit();
+ DeleteFile deleteFile1 = writePosDeleteFile(table, 0);
+ DeleteFile deleteFile2 = writePosDeleteFile(table, 1);
+
table.newRowDelta().addDeletes(deleteFile1).addDeletes(deleteFile2).commit();
table.refresh();
long posDeleteCommitId = table.currentSnapshot().snapshotId();
@@ -1648,7 +1655,7 @@ public abstract class TestIcebergSourceTablesBase extends
TestBase {
expected.add(
builder
.set("partition", partitionBuilder.set("id", 1).build())
- .set("record_count", 1L)
+ .set("record_count", 3L)
.set("file_count", 1)
.set(
"total_data_file_size_in_bytes",
@@ -1664,13 +1671,13 @@ public abstract class TestIcebergSourceTablesBase
extends TestBase {
expected.add(
builder
.set("partition", partitionBuilder.set("id", 2).build())
- .set("record_count", 1L)
+ .set("record_count", 3L)
.set("file_count", 1)
.set(
"total_data_file_size_in_bytes",
totalSizeInBytes(table.snapshot(firstCommitId).addedDataFiles(table.io())))
- .set("position_delete_record_count", 1L) // should be incremented
now
- .set("position_delete_file_count", 1) // should be incremented now
+ .set("position_delete_record_count", 2L) // should be incremented
now
+ .set("position_delete_file_count", 2) // should be incremented now
.set("equality_delete_record_count", 0L)
.set("equality_delete_file_count", 0)
.set("spec_id", 0)
@@ -1684,8 +1691,9 @@ public abstract class TestIcebergSourceTablesBase extends
TestBase {
}
// test equality delete
- DeleteFile eqDeleteFile = writeEqDeleteFile(table);
- table.newRowDelta().addDeletes(eqDeleteFile).commit();
+ DeleteFile eqDeleteFile1 = writeEqDeleteFile(table, "d");
+ DeleteFile eqDeleteFile2 = writeEqDeleteFile(table, "f");
+
table.newRowDelta().addDeletes(eqDeleteFile1).addDeletes(eqDeleteFile2).commit();
table.refresh();
long eqDeleteCommitId = table.currentSnapshot().snapshotId();
actual =
@@ -1701,12 +1709,12 @@ public abstract class TestIcebergSourceTablesBase
extends TestBase {
0,
builder
.set("partition", partitionBuilder.set("id", 1).build())
- .set("record_count", 1L)
+ .set("record_count", 3L)
.set("file_count", 1)
.set("position_delete_record_count", 0L)
.set("position_delete_file_count", 0)
- .set("equality_delete_record_count", 1L) // should be incremented
now
- .set("equality_delete_file_count", 1) // should be incremented now
+ .set("equality_delete_record_count", 2L) // should be incremented
now
+ .set("equality_delete_file_count", 2) // should be incremented now
.set("last_updated_at",
table.snapshot(eqDeleteCommitId).timestampMillis() * 1000)
.set("last_updated_snapshot_id", eqDeleteCommitId)
.build());
@@ -2262,22 +2270,26 @@ public abstract class TestIcebergSourceTablesBase
extends TestBase {
}
private DeleteFile writePosDeleteFile(Table table) {
+ return writePosDeleteFile(table, 0L);
+ }
+
+ private DeleteFile writePosDeleteFile(Table table, long pos) {
DataFile dataFile =
Iterables.getFirst(table.currentSnapshot().addedDataFiles(table.io()),
null);
PartitionSpec dataFileSpec = table.specs().get(dataFile.specId());
StructLike dataFilePartition = dataFile.partition();
PositionDelete<InternalRow> delete = PositionDelete.create();
- delete.set(dataFile.path(), 0L, null);
+ delete.set(dataFile.path(), pos, null);
return writePositionDeletes(table, dataFileSpec, dataFilePartition,
ImmutableList.of(delete));
}
- private DeleteFile writeEqDeleteFile(Table table) {
+ private DeleteFile writeEqDeleteFile(Table table, String dataValue) {
List<Record> deletes = Lists.newArrayList();
- Schema deleteRowSchema = SCHEMA.select("id");
+ Schema deleteRowSchema = SCHEMA.select("data");
Record delete = GenericRecord.create(deleteRowSchema);
- deletes.add(delete.copy("id", 1));
+ deletes.add(delete.copy("data", dataValue));
try {
return FileHelpers.writeDeleteFile(
table,