This is an automated email from the ASF dual-hosted git repository.
szehon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 025cdf0115 Core: Add total data size to Partitions table (#7920)
025cdf0115 is described below
commit 025cdf0115212ac265fadad357234f9aa9cf1dfd
Author: hsiang-c <[email protected]>
AuthorDate: Sat Jul 8 01:35:01 2023 +0800
Core: Add total data size to Partitions table (#7920)
---
.../java/org/apache/iceberg/PartitionsTable.java | 11 +++++
.../spark/source/TestIcebergSourceTablesBase.java | 49 +++++++++++++++++++
.../spark/source/TestIcebergSourceTablesBase.java | 55 ++++++++++++++++++++++
.../spark/source/TestIcebergSourceTablesBase.java | 54 +++++++++++++++++++++
.../spark/source/TestIcebergSourceTablesBase.java | 55 ++++++++++++++++++++++
5 files changed, 224 insertions(+)
diff --git a/core/src/main/java/org/apache/iceberg/PartitionsTable.java
b/core/src/main/java/org/apache/iceberg/PartitionsTable.java
index 71c5a29ef0..d93200c7cf 100644
--- a/core/src/main/java/org/apache/iceberg/PartitionsTable.java
+++ b/core/src/main/java/org/apache/iceberg/PartitionsTable.java
@@ -53,6 +53,11 @@ public class PartitionsTable extends BaseMetadataTable {
2, "record_count", Types.LongType.get(), "Count of records in
data files"),
Types.NestedField.required(
3, "file_count", Types.IntegerType.get(), "Count of data
files"),
+ Types.NestedField.required(
+ 11,
+ "total_data_file_size_in_bytes",
+ Types.LongType.get(),
+ "Total size in bytes of data files"),
Types.NestedField.required(
5,
"position_delete_record_count",
@@ -97,6 +102,7 @@ public class PartitionsTable extends BaseMetadataTable {
return schema.select(
"record_count",
"file_count",
+ "total_data_file_size_in_bytes",
"position_delete_record_count",
"position_delete_file_count",
"equality_delete_record_count",
@@ -125,6 +131,7 @@ public class PartitionsTable extends BaseMetadataTable {
StaticDataTask.Row.of(
root.dataRecordCount,
root.dataFileCount,
+ root.dataFileSizeInBytes,
root.posDeleteRecordCount,
root.posDeleteFileCount,
root.eqDeleteRecordCount,
@@ -147,6 +154,7 @@ public class PartitionsTable extends BaseMetadataTable {
partition.specId,
partition.dataRecordCount,
partition.dataFileCount,
+ partition.dataFileSizeInBytes,
partition.posDeleteRecordCount,
partition.posDeleteFileCount,
partition.eqDeleteRecordCount,
@@ -269,6 +277,7 @@ public class PartitionsTable extends BaseMetadataTable {
private int specId;
private long dataRecordCount;
private int dataFileCount;
+ private long dataFileSizeInBytes;
private long posDeleteRecordCount;
private int posDeleteFileCount;
private long eqDeleteRecordCount;
@@ -281,6 +290,7 @@ public class PartitionsTable extends BaseMetadataTable {
this.specId = 0;
this.dataRecordCount = 0L;
this.dataFileCount = 0;
+ this.dataFileSizeInBytes = 0L;
this.posDeleteRecordCount = 0L;
this.posDeleteFileCount = 0;
this.eqDeleteRecordCount = 0L;
@@ -301,6 +311,7 @@ public class PartitionsTable extends BaseMetadataTable {
this.dataRecordCount += file.recordCount();
this.dataFileCount += 1;
this.specId = file.specId();
+ this.dataFileSizeInBytes += file.fileSizeInBytes();
break;
case POSITION_DELETES:
this.posDeleteRecordCount = file.recordCount();
diff --git
a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
index 5f06243e6f..fcf61d5f75 100644
---
a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
+++
b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
@@ -25,6 +25,7 @@ import static
org.apache.iceberg.types.Types.NestedField.required;
import java.io.IOException;
import java.io.UncheckedIOException;
+import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.StringJoiner;
@@ -37,6 +38,7 @@ import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
@@ -1233,6 +1235,11 @@ public abstract class TestIcebergSourceTablesBase
extends SparkTestBase {
Types.StructType.of(
required(2, "record_count", Types.LongType.get(), "Count of
records in data files"),
required(3, "file_count", Types.IntegerType.get(), "Count of data
files"),
+ required(
+ 11,
+ "total_data_file_size_in_bytes",
+ Types.LongType.get(),
+ "Total size in bytes of data files"),
required(
5,
"position_delete_record_count",
@@ -1279,6 +1286,9 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.set("last_updated_snapshot_id",
table.currentSnapshot().snapshotId())
.set("record_count", 1L)
.set("file_count", 1)
+ .set(
+ "total_data_file_size_in_bytes",
+
totalSizeInBytes(table.currentSnapshot().addedDataFiles(table.io())))
.set("position_delete_record_count", 0L)
.set("position_delete_file_count", 0)
.set("equality_delete_record_count", 0L)
@@ -1345,6 +1355,9 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.set("partition", partitionBuilder.set("id", 1).build())
.set("record_count", 1L)
.set("file_count", 1)
+ .set(
+ "total_data_file_size_in_bytes",
+
totalSizeInBytes(table.snapshot(firstCommitId).addedDataFiles(table.io())))
.set("position_delete_record_count", 0L)
.set("position_delete_file_count", 0)
.set("equality_delete_record_count", 0L)
@@ -1358,6 +1371,9 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.set("partition", partitionBuilder.set("id", 2).build())
.set("record_count", 1L)
.set("file_count", 1)
+ .set(
+ "total_data_file_size_in_bytes",
+
totalSizeInBytes(table.snapshot(secondCommitId).addedDataFiles(table.io())))
.set("position_delete_record_count", 0L)
.set("position_delete_file_count", 0)
.set("equality_delete_record_count", 0L)
@@ -1469,12 +1485,17 @@ public abstract class TestIcebergSourceTablesBase
extends SparkTestBase {
new GenericRecordBuilder(
AvroSchemaUtil.convert(
partitionsTable.schema().findType("partition").asStructType(),
"partition"));
+
+ List<DataFile> dataFiles = dataFiles(table);
+ assertDataFilePartitions(dataFiles, Arrays.asList(1, 2, 2));
+
List<GenericData.Record> expected = Lists.newArrayList();
expected.add(
builder
.set("partition", partitionBuilder.set("id", 1).build())
.set("record_count", 1L)
.set("file_count", 1)
+ .set("total_data_file_size_in_bytes",
dataFiles.get(0).fileSizeInBytes())
.set("position_delete_record_count", 0L)
.set("position_delete_file_count", 0)
.set("equality_delete_record_count", 0L)
@@ -1488,6 +1509,9 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.set("partition", partitionBuilder.set("id", 2).build())
.set("record_count", 2L)
.set("file_count", 2)
+ .set(
+ "total_data_file_size_in_bytes",
+ dataFiles.get(1).fileSizeInBytes() +
dataFiles.get(2).fileSizeInBytes())
.set("position_delete_record_count", 0L)
.set("position_delete_file_count", 0)
.set("equality_delete_record_count", 0L)
@@ -1525,6 +1549,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.set("partition", partitionBuilder.set("id", 1).build())
.set("record_count", 1L)
.set("file_count", 1)
+ .set("total_data_file_size_in_bytes",
dataFiles.get(0).fileSizeInBytes())
.set("position_delete_record_count", 0L)
.set("position_delete_file_count", 0)
.set("equality_delete_record_count", 0L)
@@ -2028,4 +2053,28 @@ public abstract class TestIcebergSourceTablesBase
extends SparkTestBase {
public static Types.StructType nonDerivedSchema(Dataset<Row> metadataTable) {
return
SparkSchemaUtil.convert(selectNonDerived(metadataTable).schema()).asStruct();
}
+
+ private long totalSizeInBytes(Iterable<DataFile> dataFiles) {
+ return
Lists.newArrayList(dataFiles).stream().mapToLong(DataFile::fileSizeInBytes).sum();
+ }
+
+ private List<DataFile> dataFiles(Table table) {
+ CloseableIterable<FileScanTask> tasks = table.newScan().planFiles();
+ return Lists.newArrayList(CloseableIterable.transform(tasks,
FileScanTask::file));
+ }
+
+ private void assertDataFilePartitions(
+ List<DataFile> dataFiles, List<Integer> expectedPartitionIds) {
+ Assert.assertEquals(
+ "Table should have " + expectedPartitionIds.size() + " data files",
+ expectedPartitionIds.size(),
+ dataFiles.size());
+
+ for (int i = 0; i < dataFiles.size(); ++i) {
+ Assert.assertEquals(
+ "Data file should have partition of id " +
expectedPartitionIds.get(i),
+ expectedPartitionIds.get(i).intValue(),
+ dataFiles.get(i).partition().get(0, Integer.class).intValue());
+ }
+ }
}
diff --git
a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
index c8c682c01d..f90eb87ce1 100644
---
a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
+++
b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
@@ -25,6 +25,7 @@ import static
org.apache.iceberg.types.Types.NestedField.required;
import java.io.IOException;
import java.io.UncheckedIOException;
+import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.StringJoiner;
@@ -36,6 +37,7 @@ import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.Files;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionSpec;
@@ -1238,6 +1240,11 @@ public abstract class TestIcebergSourceTablesBase
extends SparkTestBase {
Types.StructType.of(
required(2, "record_count", Types.LongType.get(), "Count of
records in data files"),
required(3, "file_count", Types.IntegerType.get(), "Count of data
files"),
+ required(
+ 11,
+ "total_data_file_size_in_bytes",
+ Types.LongType.get(),
+ "Total size in bytes of data files"),
required(
5,
"position_delete_record_count",
@@ -1284,6 +1291,9 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.set("last_updated_snapshot_id",
table.currentSnapshot().snapshotId())
.set("record_count", 1L)
.set("file_count", 1)
+ .set(
+ "total_data_file_size_in_bytes",
+
totalSizeInBytes(table.currentSnapshot().addedDataFiles(table.io())))
.set("position_delete_record_count", 0L)
.set("position_delete_file_count", 0)
.set("equality_delete_record_count", 0L)
@@ -1350,6 +1360,9 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.set("partition", partitionBuilder.set("id", 1).build())
.set("record_count", 1L)
.set("file_count", 1)
+ .set(
+ "total_data_file_size_in_bytes",
+
totalSizeInBytes(table.snapshot(firstCommitId).addedDataFiles(table.io())))
.set("position_delete_record_count", 0L)
.set("position_delete_file_count", 0)
.set("equality_delete_record_count", 0L)
@@ -1363,6 +1376,9 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.set("partition", partitionBuilder.set("id", 2).build())
.set("record_count", 1L)
.set("file_count", 1)
+ .set(
+ "total_data_file_size_in_bytes",
+
totalSizeInBytes(table.snapshot(secondCommitId).addedDataFiles(table.io())))
.set("position_delete_record_count", 0L)
.set("position_delete_file_count", 0)
.set("equality_delete_record_count", 0L)
@@ -1474,12 +1490,17 @@ public abstract class TestIcebergSourceTablesBase
extends SparkTestBase {
new GenericRecordBuilder(
AvroSchemaUtil.convert(
partitionsTable.schema().findType("partition").asStructType(),
"partition"));
+
+ List<DataFile> dataFiles = dataFiles(table);
+ assertDataFilePartitions(dataFiles, Arrays.asList(1, 2, 2));
+
List<GenericData.Record> expected = Lists.newArrayList();
expected.add(
builder
.set("partition", partitionBuilder.set("id", 1).build())
.set("record_count", 1L)
.set("file_count", 1)
+ .set("total_data_file_size_in_bytes",
dataFiles.get(0).fileSizeInBytes())
.set("position_delete_record_count", 0L)
.set("position_delete_file_count", 0)
.set("equality_delete_record_count", 0L)
@@ -1493,6 +1514,9 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.set("partition", partitionBuilder.set("id", 2).build())
.set("record_count", 2L)
.set("file_count", 2)
+ .set(
+ "total_data_file_size_in_bytes",
+ dataFiles.get(1).fileSizeInBytes() +
dataFiles.get(2).fileSizeInBytes())
.set("position_delete_record_count", 0L)
.set("position_delete_file_count", 0)
.set("equality_delete_record_count", 0L)
@@ -1530,6 +1554,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.set("partition", partitionBuilder.set("id", 1).build())
.set("record_count", 1L)
.set("file_count", 1)
+ .set("total_data_file_size_in_bytes",
dataFiles.get(0).fileSizeInBytes())
.set("position_delete_record_count", 0L)
.set("position_delete_file_count", 0)
.set("equality_delete_record_count", 0L)
@@ -1611,6 +1636,9 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.set("partition", partitionBuilder.set("id", 1).build())
.set("record_count", 1L)
.set("file_count", 1)
+ .set(
+ "total_data_file_size_in_bytes",
+
totalSizeInBytes(table.snapshot(firstCommitId).addedDataFiles(table.io())))
.set("position_delete_record_count", 0L)
.set("position_delete_file_count", 0)
.set("equality_delete_record_count", 0L)
@@ -1624,6 +1652,9 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.set("partition", partitionBuilder.set("id", 2).build())
.set("record_count", 1L)
.set("file_count", 1)
+ .set(
+ "total_data_file_size_in_bytes",
+
totalSizeInBytes(table.snapshot(firstCommitId).addedDataFiles(table.io())))
.set("position_delete_record_count", 1L) // should be incremented
now
.set("position_delete_file_count", 1) // should be incremented now
.set("equality_delete_record_count", 0L)
@@ -2159,4 +2190,28 @@ public abstract class TestIcebergSourceTablesBase
extends SparkTestBase {
throw new RuntimeException(e);
}
}
+
+ private long totalSizeInBytes(Iterable<DataFile> dataFiles) {
+ return
Lists.newArrayList(dataFiles).stream().mapToLong(DataFile::fileSizeInBytes).sum();
+ }
+
+ private List<DataFile> dataFiles(Table table) {
+ CloseableIterable<FileScanTask> tasks = table.newScan().planFiles();
+ return Lists.newArrayList(CloseableIterable.transform(tasks,
FileScanTask::file));
+ }
+
+ private void assertDataFilePartitions(
+ List<DataFile> dataFiles, List<Integer> expectedPartitionIds) {
+ Assert.assertEquals(
+ "Table should have " + expectedPartitionIds.size() + " data files",
+ expectedPartitionIds.size(),
+ dataFiles.size());
+
+ for (int i = 0; i < dataFiles.size(); ++i) {
+ Assert.assertEquals(
+ "Data file should have partition of id " +
expectedPartitionIds.get(i),
+ expectedPartitionIds.get(i).intValue(),
+ dataFiles.get(i).partition().get(0, Integer.class).intValue());
+ }
+ }
}
diff --git
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
index 199104c03e..c92e1c2759 100644
---
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
+++
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
@@ -28,6 +28,7 @@ import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
+import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.StringJoiner;
@@ -39,6 +40,7 @@ import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.Files;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionSpec;
@@ -1243,6 +1245,11 @@ public abstract class TestIcebergSourceTablesBase
extends SparkTestBase {
Types.StructType.of(
required(2, "record_count", Types.LongType.get(), "Count of
records in data files"),
required(3, "file_count", Types.IntegerType.get(), "Count of data
files"),
+ required(
+ 11,
+ "total_data_file_size_in_bytes",
+ Types.LongType.get(),
+ "Total size in bytes of data files"),
required(
5,
"position_delete_record_count",
@@ -1289,6 +1296,9 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.set("last_updated_snapshot_id",
table.currentSnapshot().snapshotId())
.set("record_count", 1L)
.set("file_count", 1)
+ .set(
+ "total_data_file_size_in_bytes",
+
totalSizeInBytes(table.currentSnapshot().addedDataFiles(table.io())))
.set("position_delete_record_count", 0L)
.set("position_delete_file_count", 0)
.set("equality_delete_record_count", 0L)
@@ -1355,6 +1365,9 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.set("partition", partitionBuilder.set("id", 1).build())
.set("record_count", 1L)
.set("file_count", 1)
+ .set(
+ "total_data_file_size_in_bytes",
+
totalSizeInBytes(table.snapshot(firstCommitId).addedDataFiles(table.io())))
.set("position_delete_record_count", 0L)
.set("position_delete_file_count", 0)
.set("equality_delete_record_count", 0L)
@@ -1368,6 +1381,9 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.set("partition", partitionBuilder.set("id", 2).build())
.set("record_count", 1L)
.set("file_count", 1)
+ .set(
+ "total_data_file_size_in_bytes",
+
totalSizeInBytes(table.snapshot(secondCommitId).addedDataFiles(table.io())))
.set("position_delete_record_count", 0L)
.set("position_delete_file_count", 0)
.set("equality_delete_record_count", 0L)
@@ -1473,6 +1489,9 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.orderBy("partition.id")
.collectAsList();
+ List<DataFile> dataFiles = dataFiles(table);
+ assertDataFilePartitions(dataFiles, Arrays.asList(1, 2, 2));
+
GenericRecordBuilder builder =
new
GenericRecordBuilder(AvroSchemaUtil.convert(partitionsTable.schema(),
"partitions"));
GenericRecordBuilder partitionBuilder =
@@ -1485,6 +1504,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.set("partition", partitionBuilder.set("id", 1).build())
.set("record_count", 1L)
.set("file_count", 1)
+ .set("total_data_file_size_in_bytes",
dataFiles.get(0).fileSizeInBytes())
.set("position_delete_record_count", 0L)
.set("position_delete_file_count", 0)
.set("equality_delete_record_count", 0L)
@@ -1498,6 +1518,9 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.set("partition", partitionBuilder.set("id", 2).build())
.set("record_count", 2L)
.set("file_count", 2)
+ .set(
+ "total_data_file_size_in_bytes",
+ dataFiles.get(1).fileSizeInBytes() +
dataFiles.get(2).fileSizeInBytes())
.set("position_delete_record_count", 0L)
.set("position_delete_file_count", 0)
.set("equality_delete_record_count", 0L)
@@ -1535,6 +1558,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.set("partition", partitionBuilder.set("id", 1).build())
.set("record_count", 1L)
.set("file_count", 1)
+ .set("total_data_file_size_in_bytes",
dataFiles.get(0).fileSizeInBytes())
.set("position_delete_record_count", 0L)
.set("position_delete_file_count", 0)
.set("equality_delete_record_count", 0L)
@@ -1616,6 +1640,9 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.set("partition", partitionBuilder.set("id", 1).build())
.set("record_count", 1L)
.set("file_count", 1)
+ .set(
+ "total_data_file_size_in_bytes",
+
totalSizeInBytes(table.snapshot(firstCommitId).addedDataFiles(table.io())))
.set("position_delete_record_count", 0L)
.set("position_delete_file_count", 0)
.set("equality_delete_record_count", 0L)
@@ -1629,6 +1656,9 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.set("partition", partitionBuilder.set("id", 2).build())
.set("record_count", 1L)
.set("file_count", 1)
+ .set(
+ "total_data_file_size_in_bytes",
+
totalSizeInBytes(table.snapshot(firstCommitId).addedDataFiles(table.io())))
.set("position_delete_record_count", 1L) // should be incremented
now
.set("position_delete_file_count", 1) // should be incremented now
.set("equality_delete_record_count", 0L)
@@ -2227,4 +2257,28 @@ public abstract class TestIcebergSourceTablesBase
extends SparkTestBase {
throw new RuntimeException(e);
}
}
+
+ private long totalSizeInBytes(Iterable<DataFile> dataFiles) {
+ return
Lists.newArrayList(dataFiles).stream().mapToLong(DataFile::fileSizeInBytes).sum();
+ }
+
+ private List<DataFile> dataFiles(Table table) {
+ CloseableIterable<FileScanTask> tasks = table.newScan().planFiles();
+ return Lists.newArrayList(CloseableIterable.transform(tasks,
FileScanTask::file));
+ }
+
+ private void assertDataFilePartitions(
+ List<DataFile> dataFiles, List<Integer> expectedPartitionIds) {
+ Assert.assertEquals(
+ "Table should have " + expectedPartitionIds.size() + " data files",
+ expectedPartitionIds.size(),
+ dataFiles.size());
+
+ for (int i = 0; i < dataFiles.size(); ++i) {
+ Assert.assertEquals(
+ "Data file should have partition of id " +
expectedPartitionIds.get(i),
+ expectedPartitionIds.get(i).intValue(),
+ dataFiles.get(i).partition().get(0, Integer.class).intValue());
+ }
+ }
}
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
index daed197cc9..8f78697212 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
@@ -28,6 +28,7 @@ import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
+import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.StringJoiner;
@@ -38,6 +39,7 @@ import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.Files;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionSpec;
@@ -1240,6 +1242,11 @@ public abstract class TestIcebergSourceTablesBase
extends SparkTestBase {
Types.StructType.of(
required(2, "record_count", Types.LongType.get(), "Count of
records in data files"),
required(3, "file_count", Types.IntegerType.get(), "Count of data
files"),
+ required(
+ 11,
+ "total_data_file_size_in_bytes",
+ Types.LongType.get(),
+ "Total size in bytes of data files"),
required(
5,
"position_delete_record_count",
@@ -1286,6 +1293,9 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.set("last_updated_snapshot_id",
table.currentSnapshot().snapshotId())
.set("record_count", 1L)
.set("file_count", 1)
+ .set(
+ "total_data_file_size_in_bytes",
+
totalSizeInBytes(table.currentSnapshot().addedDataFiles(table.io())))
.set("position_delete_record_count", 0L)
.set("position_delete_file_count", 0)
.set("equality_delete_record_count", 0L)
@@ -1352,6 +1362,9 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.set("partition", partitionBuilder.set("id", 1).build())
.set("record_count", 1L)
.set("file_count", 1)
+ .set(
+ "total_data_file_size_in_bytes",
+
totalSizeInBytes(table.snapshot(firstCommitId).addedDataFiles(table.io())))
.set("position_delete_record_count", 0L)
.set("position_delete_file_count", 0)
.set("equality_delete_record_count", 0L)
@@ -1365,6 +1378,9 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.set("partition", partitionBuilder.set("id", 2).build())
.set("record_count", 1L)
.set("file_count", 1)
+ .set(
+ "total_data_file_size_in_bytes",
+
totalSizeInBytes(table.snapshot(secondCommitId).addedDataFiles(table.io())))
.set("position_delete_record_count", 0L)
.set("position_delete_file_count", 0)
.set("equality_delete_record_count", 0L)
@@ -1476,12 +1492,17 @@ public abstract class TestIcebergSourceTablesBase
extends SparkTestBase {
new GenericRecordBuilder(
AvroSchemaUtil.convert(
partitionsTable.schema().findType("partition").asStructType(),
"partition"));
+
+ List<DataFile> dataFiles = dataFiles(table);
+ assertDataFilePartitions(dataFiles, Arrays.asList(1, 2, 2));
+
List<GenericData.Record> expected = Lists.newArrayList();
expected.add(
builder
.set("partition", partitionBuilder.set("id", 1).build())
.set("record_count", 1L)
.set("file_count", 1)
+ .set("total_data_file_size_in_bytes",
dataFiles.get(0).fileSizeInBytes())
.set("position_delete_record_count", 0L)
.set("position_delete_file_count", 0)
.set("equality_delete_record_count", 0L)
@@ -1495,6 +1516,9 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.set("partition", partitionBuilder.set("id", 2).build())
.set("record_count", 2L)
.set("file_count", 2)
+ .set(
+ "total_data_file_size_in_bytes",
+ dataFiles.get(1).fileSizeInBytes() +
dataFiles.get(2).fileSizeInBytes())
.set("position_delete_record_count", 0L)
.set("position_delete_file_count", 0)
.set("equality_delete_record_count", 0L)
@@ -1532,6 +1556,7 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.set("partition", partitionBuilder.set("id", 1).build())
.set("record_count", 1L)
.set("file_count", 1)
+ .set("total_data_file_size_in_bytes",
dataFiles.get(0).fileSizeInBytes())
.set("position_delete_record_count", 0L)
.set("position_delete_file_count", 0)
.set("equality_delete_record_count", 0L)
@@ -1613,6 +1638,9 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.set("partition", partitionBuilder.set("id", 1).build())
.set("record_count", 1L)
.set("file_count", 1)
+ .set(
+ "total_data_file_size_in_bytes",
+
totalSizeInBytes(table.snapshot(firstCommitId).addedDataFiles(table.io())))
.set("position_delete_record_count", 0L)
.set("position_delete_file_count", 0)
.set("equality_delete_record_count", 0L)
@@ -1626,6 +1654,9 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
.set("partition", partitionBuilder.set("id", 2).build())
.set("record_count", 1L)
.set("file_count", 1)
+ .set(
+ "total_data_file_size_in_bytes",
+
totalSizeInBytes(table.snapshot(firstCommitId).addedDataFiles(table.io())))
.set("position_delete_record_count", 1L) // should be incremented
now
.set("position_delete_file_count", 1) // should be incremented now
.set("equality_delete_record_count", 0L)
@@ -2223,4 +2254,28 @@ public abstract class TestIcebergSourceTablesBase
extends SparkTestBase {
throw new RuntimeException(e);
}
}
+
+ private long totalSizeInBytes(Iterable<DataFile> dataFiles) {
+ return
Lists.newArrayList(dataFiles).stream().mapToLong(DataFile::fileSizeInBytes).sum();
+ }
+
+ private List<DataFile> dataFiles(Table table) {
+ CloseableIterable<FileScanTask> tasks = table.newScan().planFiles();
+ return Lists.newArrayList(CloseableIterable.transform(tasks,
FileScanTask::file));
+ }
+
+ private void assertDataFilePartitions(
+ List<DataFile> dataFiles, List<Integer> expectedPartitionIds) {
+ Assert.assertEquals(
+ "Table should have " + expectedPartitionIds.size() + " data files",
+ expectedPartitionIds.size(),
+ dataFiles.size());
+
+ for (int i = 0; i < dataFiles.size(); ++i) {
+ Assert.assertEquals(
+ "Data file should have partition of id " +
expectedPartitionIds.get(i),
+ expectedPartitionIds.get(i).intValue(),
+ dataFiles.get(i).partition().get(0, Integer.class).intValue());
+ }
+ }
}