This is an automated email from the ASF dual-hosted git repository.

szehon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 025cdf0115 Core: Add total data size to Partitions table (#7920)
025cdf0115 is described below

commit 025cdf0115212ac265fadad357234f9aa9cf1dfd
Author: hsiang-c <[email protected]>
AuthorDate: Sat Jul 8 01:35:01 2023 +0800

    Core: Add total data size to Partitions table (#7920)
---
 .../java/org/apache/iceberg/PartitionsTable.java   | 11 +++++
 .../spark/source/TestIcebergSourceTablesBase.java  | 49 +++++++++++++++++++
 .../spark/source/TestIcebergSourceTablesBase.java  | 55 ++++++++++++++++++++++
 .../spark/source/TestIcebergSourceTablesBase.java  | 54 +++++++++++++++++++++
 .../spark/source/TestIcebergSourceTablesBase.java  | 55 ++++++++++++++++++++++
 5 files changed, 224 insertions(+)

diff --git a/core/src/main/java/org/apache/iceberg/PartitionsTable.java 
b/core/src/main/java/org/apache/iceberg/PartitionsTable.java
index 71c5a29ef0..d93200c7cf 100644
--- a/core/src/main/java/org/apache/iceberg/PartitionsTable.java
+++ b/core/src/main/java/org/apache/iceberg/PartitionsTable.java
@@ -53,6 +53,11 @@ public class PartitionsTable extends BaseMetadataTable {
                 2, "record_count", Types.LongType.get(), "Count of records in 
data files"),
             Types.NestedField.required(
                 3, "file_count", Types.IntegerType.get(), "Count of data 
files"),
+            Types.NestedField.required(
+                11,
+                "total_data_file_size_in_bytes",
+                Types.LongType.get(),
+                "Total size in bytes of data files"),
             Types.NestedField.required(
                 5,
                 "position_delete_record_count",
@@ -97,6 +102,7 @@ public class PartitionsTable extends BaseMetadataTable {
       return schema.select(
           "record_count",
           "file_count",
+          "total_data_file_size_in_bytes",
           "position_delete_record_count",
           "position_delete_file_count",
           "equality_delete_record_count",
@@ -125,6 +131,7 @@ public class PartitionsTable extends BaseMetadataTable {
               StaticDataTask.Row.of(
                   root.dataRecordCount,
                   root.dataFileCount,
+                  root.dataFileSizeInBytes,
                   root.posDeleteRecordCount,
                   root.posDeleteFileCount,
                   root.eqDeleteRecordCount,
@@ -147,6 +154,7 @@ public class PartitionsTable extends BaseMetadataTable {
         partition.specId,
         partition.dataRecordCount,
         partition.dataFileCount,
+        partition.dataFileSizeInBytes,
         partition.posDeleteRecordCount,
         partition.posDeleteFileCount,
         partition.eqDeleteRecordCount,
@@ -269,6 +277,7 @@ public class PartitionsTable extends BaseMetadataTable {
     private int specId;
     private long dataRecordCount;
     private int dataFileCount;
+    private long dataFileSizeInBytes;
     private long posDeleteRecordCount;
     private int posDeleteFileCount;
     private long eqDeleteRecordCount;
@@ -281,6 +290,7 @@ public class PartitionsTable extends BaseMetadataTable {
       this.specId = 0;
       this.dataRecordCount = 0L;
       this.dataFileCount = 0;
+      this.dataFileSizeInBytes = 0L;
       this.posDeleteRecordCount = 0L;
       this.posDeleteFileCount = 0;
       this.eqDeleteRecordCount = 0L;
@@ -301,6 +311,7 @@ public class PartitionsTable extends BaseMetadataTable {
           this.dataRecordCount += file.recordCount();
           this.dataFileCount += 1;
           this.specId = file.specId();
+          this.dataFileSizeInBytes += file.fileSizeInBytes();
           break;
         case POSITION_DELETES:
           this.posDeleteRecordCount = file.recordCount();
diff --git 
a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
 
b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
index 5f06243e6f..fcf61d5f75 100644
--- 
a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
+++ 
b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
@@ -25,6 +25,7 @@ import static 
org.apache.iceberg.types.Types.NestedField.required;
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
+import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
 import java.util.StringJoiner;
@@ -37,6 +38,7 @@ import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
@@ -1233,6 +1235,11 @@ public abstract class TestIcebergSourceTablesBase 
extends SparkTestBase {
         Types.StructType.of(
             required(2, "record_count", Types.LongType.get(), "Count of 
records in data files"),
             required(3, "file_count", Types.IntegerType.get(), "Count of data 
files"),
+            required(
+                11,
+                "total_data_file_size_in_bytes",
+                Types.LongType.get(),
+                "Total size in bytes of data files"),
             required(
                 5,
                 "position_delete_record_count",
@@ -1279,6 +1286,9 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
             .set("last_updated_snapshot_id", 
table.currentSnapshot().snapshotId())
             .set("record_count", 1L)
             .set("file_count", 1)
+            .set(
+                "total_data_file_size_in_bytes",
+                
totalSizeInBytes(table.currentSnapshot().addedDataFiles(table.io())))
             .set("position_delete_record_count", 0L)
             .set("position_delete_file_count", 0)
             .set("equality_delete_record_count", 0L)
@@ -1345,6 +1355,9 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
             .set("partition", partitionBuilder.set("id", 1).build())
             .set("record_count", 1L)
             .set("file_count", 1)
+            .set(
+                "total_data_file_size_in_bytes",
+                
totalSizeInBytes(table.snapshot(firstCommitId).addedDataFiles(table.io())))
             .set("position_delete_record_count", 0L)
             .set("position_delete_file_count", 0)
             .set("equality_delete_record_count", 0L)
@@ -1358,6 +1371,9 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
             .set("partition", partitionBuilder.set("id", 2).build())
             .set("record_count", 1L)
             .set("file_count", 1)
+            .set(
+                "total_data_file_size_in_bytes",
+                
totalSizeInBytes(table.snapshot(secondCommitId).addedDataFiles(table.io())))
             .set("position_delete_record_count", 0L)
             .set("position_delete_file_count", 0)
             .set("equality_delete_record_count", 0L)
@@ -1469,12 +1485,17 @@ public abstract class TestIcebergSourceTablesBase 
extends SparkTestBase {
         new GenericRecordBuilder(
             AvroSchemaUtil.convert(
                 partitionsTable.schema().findType("partition").asStructType(), 
"partition"));
+
+    List<DataFile> dataFiles = dataFiles(table);
+    assertDataFilePartitions(dataFiles, Arrays.asList(1, 2, 2));
+
     List<GenericData.Record> expected = Lists.newArrayList();
     expected.add(
         builder
             .set("partition", partitionBuilder.set("id", 1).build())
             .set("record_count", 1L)
             .set("file_count", 1)
+            .set("total_data_file_size_in_bytes", 
dataFiles.get(0).fileSizeInBytes())
             .set("position_delete_record_count", 0L)
             .set("position_delete_file_count", 0)
             .set("equality_delete_record_count", 0L)
@@ -1488,6 +1509,9 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
             .set("partition", partitionBuilder.set("id", 2).build())
             .set("record_count", 2L)
             .set("file_count", 2)
+            .set(
+                "total_data_file_size_in_bytes",
+                dataFiles.get(1).fileSizeInBytes() + 
dataFiles.get(2).fileSizeInBytes())
             .set("position_delete_record_count", 0L)
             .set("position_delete_file_count", 0)
             .set("equality_delete_record_count", 0L)
@@ -1525,6 +1549,7 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
             .set("partition", partitionBuilder.set("id", 1).build())
             .set("record_count", 1L)
             .set("file_count", 1)
+            .set("total_data_file_size_in_bytes", 
dataFiles.get(0).fileSizeInBytes())
             .set("position_delete_record_count", 0L)
             .set("position_delete_file_count", 0)
             .set("equality_delete_record_count", 0L)
@@ -2028,4 +2053,28 @@ public abstract class TestIcebergSourceTablesBase 
extends SparkTestBase {
   public static Types.StructType nonDerivedSchema(Dataset<Row> metadataTable) {
     return 
SparkSchemaUtil.convert(selectNonDerived(metadataTable).schema()).asStruct();
   }
+
+  private long totalSizeInBytes(Iterable<DataFile> dataFiles) {
+    return 
Lists.newArrayList(dataFiles).stream().mapToLong(DataFile::fileSizeInBytes).sum();
+  }
+
+  private List<DataFile> dataFiles(Table table) {
+    CloseableIterable<FileScanTask> tasks = table.newScan().planFiles();
+    return Lists.newArrayList(CloseableIterable.transform(tasks, 
FileScanTask::file));
+  }
+
+  private void assertDataFilePartitions(
+      List<DataFile> dataFiles, List<Integer> expectedPartitionIds) {
+    Assert.assertEquals(
+        "Table should have " + expectedPartitionIds.size() + " data files",
+        expectedPartitionIds.size(),
+        dataFiles.size());
+
+    for (int i = 0; i < dataFiles.size(); ++i) {
+      Assert.assertEquals(
+          "Data file should have partition of id " + 
expectedPartitionIds.get(i),
+          expectedPartitionIds.get(i).intValue(),
+          dataFiles.get(i).partition().get(0, Integer.class).intValue());
+    }
+  }
 }
diff --git 
a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
 
b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
index c8c682c01d..f90eb87ce1 100644
--- 
a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
+++ 
b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
@@ -25,6 +25,7 @@ import static 
org.apache.iceberg.types.Types.NestedField.required;
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
+import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
 import java.util.StringJoiner;
@@ -36,6 +37,7 @@ import org.apache.avro.generic.GenericRecordBuilder;
 import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.Files;
 import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.PartitionSpec;
@@ -1238,6 +1240,11 @@ public abstract class TestIcebergSourceTablesBase 
extends SparkTestBase {
         Types.StructType.of(
             required(2, "record_count", Types.LongType.get(), "Count of 
records in data files"),
             required(3, "file_count", Types.IntegerType.get(), "Count of data 
files"),
+            required(
+                11,
+                "total_data_file_size_in_bytes",
+                Types.LongType.get(),
+                "Total size in bytes of data files"),
             required(
                 5,
                 "position_delete_record_count",
@@ -1284,6 +1291,9 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
             .set("last_updated_snapshot_id", 
table.currentSnapshot().snapshotId())
             .set("record_count", 1L)
             .set("file_count", 1)
+            .set(
+                "total_data_file_size_in_bytes",
+                
totalSizeInBytes(table.currentSnapshot().addedDataFiles(table.io())))
             .set("position_delete_record_count", 0L)
             .set("position_delete_file_count", 0)
             .set("equality_delete_record_count", 0L)
@@ -1350,6 +1360,9 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
             .set("partition", partitionBuilder.set("id", 1).build())
             .set("record_count", 1L)
             .set("file_count", 1)
+            .set(
+                "total_data_file_size_in_bytes",
+                
totalSizeInBytes(table.snapshot(firstCommitId).addedDataFiles(table.io())))
             .set("position_delete_record_count", 0L)
             .set("position_delete_file_count", 0)
             .set("equality_delete_record_count", 0L)
@@ -1363,6 +1376,9 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
             .set("partition", partitionBuilder.set("id", 2).build())
             .set("record_count", 1L)
             .set("file_count", 1)
+            .set(
+                "total_data_file_size_in_bytes",
+                
totalSizeInBytes(table.snapshot(secondCommitId).addedDataFiles(table.io())))
             .set("position_delete_record_count", 0L)
             .set("position_delete_file_count", 0)
             .set("equality_delete_record_count", 0L)
@@ -1474,12 +1490,17 @@ public abstract class TestIcebergSourceTablesBase 
extends SparkTestBase {
         new GenericRecordBuilder(
             AvroSchemaUtil.convert(
                 partitionsTable.schema().findType("partition").asStructType(), 
"partition"));
+
+    List<DataFile> dataFiles = dataFiles(table);
+    assertDataFilePartitions(dataFiles, Arrays.asList(1, 2, 2));
+
     List<GenericData.Record> expected = Lists.newArrayList();
     expected.add(
         builder
             .set("partition", partitionBuilder.set("id", 1).build())
             .set("record_count", 1L)
             .set("file_count", 1)
+            .set("total_data_file_size_in_bytes", 
dataFiles.get(0).fileSizeInBytes())
             .set("position_delete_record_count", 0L)
             .set("position_delete_file_count", 0)
             .set("equality_delete_record_count", 0L)
@@ -1493,6 +1514,9 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
             .set("partition", partitionBuilder.set("id", 2).build())
             .set("record_count", 2L)
             .set("file_count", 2)
+            .set(
+                "total_data_file_size_in_bytes",
+                dataFiles.get(1).fileSizeInBytes() + 
dataFiles.get(2).fileSizeInBytes())
             .set("position_delete_record_count", 0L)
             .set("position_delete_file_count", 0)
             .set("equality_delete_record_count", 0L)
@@ -1530,6 +1554,7 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
             .set("partition", partitionBuilder.set("id", 1).build())
             .set("record_count", 1L)
             .set("file_count", 1)
+            .set("total_data_file_size_in_bytes", 
dataFiles.get(0).fileSizeInBytes())
             .set("position_delete_record_count", 0L)
             .set("position_delete_file_count", 0)
             .set("equality_delete_record_count", 0L)
@@ -1611,6 +1636,9 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
             .set("partition", partitionBuilder.set("id", 1).build())
             .set("record_count", 1L)
             .set("file_count", 1)
+            .set(
+                "total_data_file_size_in_bytes",
+                
totalSizeInBytes(table.snapshot(firstCommitId).addedDataFiles(table.io())))
             .set("position_delete_record_count", 0L)
             .set("position_delete_file_count", 0)
             .set("equality_delete_record_count", 0L)
@@ -1624,6 +1652,9 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
             .set("partition", partitionBuilder.set("id", 2).build())
             .set("record_count", 1L)
             .set("file_count", 1)
+            .set(
+                "total_data_file_size_in_bytes",
+                
totalSizeInBytes(table.snapshot(firstCommitId).addedDataFiles(table.io())))
             .set("position_delete_record_count", 1L) // should be incremented 
now
             .set("position_delete_file_count", 1) // should be incremented now
             .set("equality_delete_record_count", 0L)
@@ -2159,4 +2190,28 @@ public abstract class TestIcebergSourceTablesBase 
extends SparkTestBase {
       throw new RuntimeException(e);
     }
   }
+
+  private long totalSizeInBytes(Iterable<DataFile> dataFiles) {
+    return 
Lists.newArrayList(dataFiles).stream().mapToLong(DataFile::fileSizeInBytes).sum();
+  }
+
+  private List<DataFile> dataFiles(Table table) {
+    CloseableIterable<FileScanTask> tasks = table.newScan().planFiles();
+    return Lists.newArrayList(CloseableIterable.transform(tasks, 
FileScanTask::file));
+  }
+
+  private void assertDataFilePartitions(
+      List<DataFile> dataFiles, List<Integer> expectedPartitionIds) {
+    Assert.assertEquals(
+        "Table should have " + expectedPartitionIds.size() + " data files",
+        expectedPartitionIds.size(),
+        dataFiles.size());
+
+    for (int i = 0; i < dataFiles.size(); ++i) {
+      Assert.assertEquals(
+          "Data file should have partition of id " + 
expectedPartitionIds.get(i),
+          expectedPartitionIds.get(i).intValue(),
+          dataFiles.get(i).partition().get(0, Integer.class).intValue());
+    }
+  }
 }
diff --git 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
index 199104c03e..c92e1c2759 100644
--- 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
+++ 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
@@ -28,6 +28,7 @@ import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.time.LocalDateTime;
 import java.time.ZoneOffset;
+import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
 import java.util.StringJoiner;
@@ -39,6 +40,7 @@ import org.apache.avro.generic.GenericRecordBuilder;
 import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.Files;
 import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.PartitionSpec;
@@ -1243,6 +1245,11 @@ public abstract class TestIcebergSourceTablesBase 
extends SparkTestBase {
         Types.StructType.of(
             required(2, "record_count", Types.LongType.get(), "Count of 
records in data files"),
             required(3, "file_count", Types.IntegerType.get(), "Count of data 
files"),
+            required(
+                11,
+                "total_data_file_size_in_bytes",
+                Types.LongType.get(),
+                "Total size in bytes of data files"),
             required(
                 5,
                 "position_delete_record_count",
@@ -1289,6 +1296,9 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
             .set("last_updated_snapshot_id", 
table.currentSnapshot().snapshotId())
             .set("record_count", 1L)
             .set("file_count", 1)
+            .set(
+                "total_data_file_size_in_bytes",
+                
totalSizeInBytes(table.currentSnapshot().addedDataFiles(table.io())))
             .set("position_delete_record_count", 0L)
             .set("position_delete_file_count", 0)
             .set("equality_delete_record_count", 0L)
@@ -1355,6 +1365,9 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
             .set("partition", partitionBuilder.set("id", 1).build())
             .set("record_count", 1L)
             .set("file_count", 1)
+            .set(
+                "total_data_file_size_in_bytes",
+                
totalSizeInBytes(table.snapshot(firstCommitId).addedDataFiles(table.io())))
             .set("position_delete_record_count", 0L)
             .set("position_delete_file_count", 0)
             .set("equality_delete_record_count", 0L)
@@ -1368,6 +1381,9 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
             .set("partition", partitionBuilder.set("id", 2).build())
             .set("record_count", 1L)
             .set("file_count", 1)
+            .set(
+                "total_data_file_size_in_bytes",
+                
totalSizeInBytes(table.snapshot(secondCommitId).addedDataFiles(table.io())))
             .set("position_delete_record_count", 0L)
             .set("position_delete_file_count", 0)
             .set("equality_delete_record_count", 0L)
@@ -1473,6 +1489,9 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
             .orderBy("partition.id")
             .collectAsList();
 
+    List<DataFile> dataFiles = dataFiles(table);
+    assertDataFilePartitions(dataFiles, Arrays.asList(1, 2, 2));
+
     GenericRecordBuilder builder =
         new 
GenericRecordBuilder(AvroSchemaUtil.convert(partitionsTable.schema(), 
"partitions"));
     GenericRecordBuilder partitionBuilder =
@@ -1485,6 +1504,7 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
             .set("partition", partitionBuilder.set("id", 1).build())
             .set("record_count", 1L)
             .set("file_count", 1)
+            .set("total_data_file_size_in_bytes", 
dataFiles.get(0).fileSizeInBytes())
             .set("position_delete_record_count", 0L)
             .set("position_delete_file_count", 0)
             .set("equality_delete_record_count", 0L)
@@ -1498,6 +1518,9 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
             .set("partition", partitionBuilder.set("id", 2).build())
             .set("record_count", 2L)
             .set("file_count", 2)
+            .set(
+                "total_data_file_size_in_bytes",
+                dataFiles.get(1).fileSizeInBytes() + 
dataFiles.get(2).fileSizeInBytes())
             .set("position_delete_record_count", 0L)
             .set("position_delete_file_count", 0)
             .set("equality_delete_record_count", 0L)
@@ -1535,6 +1558,7 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
             .set("partition", partitionBuilder.set("id", 1).build())
             .set("record_count", 1L)
             .set("file_count", 1)
+            .set("total_data_file_size_in_bytes", 
dataFiles.get(0).fileSizeInBytes())
             .set("position_delete_record_count", 0L)
             .set("position_delete_file_count", 0)
             .set("equality_delete_record_count", 0L)
@@ -1616,6 +1640,9 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
             .set("partition", partitionBuilder.set("id", 1).build())
             .set("record_count", 1L)
             .set("file_count", 1)
+            .set(
+                "total_data_file_size_in_bytes",
+                
totalSizeInBytes(table.snapshot(firstCommitId).addedDataFiles(table.io())))
             .set("position_delete_record_count", 0L)
             .set("position_delete_file_count", 0)
             .set("equality_delete_record_count", 0L)
@@ -1629,6 +1656,9 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
             .set("partition", partitionBuilder.set("id", 2).build())
             .set("record_count", 1L)
             .set("file_count", 1)
+            .set(
+                "total_data_file_size_in_bytes",
+                
totalSizeInBytes(table.snapshot(firstCommitId).addedDataFiles(table.io())))
             .set("position_delete_record_count", 1L) // should be incremented 
now
             .set("position_delete_file_count", 1) // should be incremented now
             .set("equality_delete_record_count", 0L)
@@ -2227,4 +2257,28 @@ public abstract class TestIcebergSourceTablesBase 
extends SparkTestBase {
       throw new RuntimeException(e);
     }
   }
+
+  private long totalSizeInBytes(Iterable<DataFile> dataFiles) {
+    return 
Lists.newArrayList(dataFiles).stream().mapToLong(DataFile::fileSizeInBytes).sum();
+  }
+
+  private List<DataFile> dataFiles(Table table) {
+    CloseableIterable<FileScanTask> tasks = table.newScan().planFiles();
+    return Lists.newArrayList(CloseableIterable.transform(tasks, 
FileScanTask::file));
+  }
+
+  private void assertDataFilePartitions(
+      List<DataFile> dataFiles, List<Integer> expectedPartitionIds) {
+    Assert.assertEquals(
+        "Table should have " + expectedPartitionIds.size() + " data files",
+        expectedPartitionIds.size(),
+        dataFiles.size());
+
+    for (int i = 0; i < dataFiles.size(); ++i) {
+      Assert.assertEquals(
+          "Data file should have partition of id " + 
expectedPartitionIds.get(i),
+          expectedPartitionIds.get(i).intValue(),
+          dataFiles.get(i).partition().get(0, Integer.class).intValue());
+    }
+  }
 }
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
index daed197cc9..8f78697212 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
@@ -28,6 +28,7 @@ import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.time.LocalDateTime;
 import java.time.ZoneOffset;
+import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
 import java.util.StringJoiner;
@@ -38,6 +39,7 @@ import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecordBuilder;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.Files;
 import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.PartitionSpec;
@@ -1240,6 +1242,11 @@ public abstract class TestIcebergSourceTablesBase 
extends SparkTestBase {
         Types.StructType.of(
             required(2, "record_count", Types.LongType.get(), "Count of 
records in data files"),
             required(3, "file_count", Types.IntegerType.get(), "Count of data 
files"),
+            required(
+                11,
+                "total_data_file_size_in_bytes",
+                Types.LongType.get(),
+                "Total size in bytes of data files"),
             required(
                 5,
                 "position_delete_record_count",
@@ -1286,6 +1293,9 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
             .set("last_updated_snapshot_id", 
table.currentSnapshot().snapshotId())
             .set("record_count", 1L)
             .set("file_count", 1)
+            .set(
+                "total_data_file_size_in_bytes",
+                
totalSizeInBytes(table.currentSnapshot().addedDataFiles(table.io())))
             .set("position_delete_record_count", 0L)
             .set("position_delete_file_count", 0)
             .set("equality_delete_record_count", 0L)
@@ -1352,6 +1362,9 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
             .set("partition", partitionBuilder.set("id", 1).build())
             .set("record_count", 1L)
             .set("file_count", 1)
+            .set(
+                "total_data_file_size_in_bytes",
+                
totalSizeInBytes(table.snapshot(firstCommitId).addedDataFiles(table.io())))
             .set("position_delete_record_count", 0L)
             .set("position_delete_file_count", 0)
             .set("equality_delete_record_count", 0L)
@@ -1365,6 +1378,9 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
             .set("partition", partitionBuilder.set("id", 2).build())
             .set("record_count", 1L)
             .set("file_count", 1)
+            .set(
+                "total_data_file_size_in_bytes",
+                
totalSizeInBytes(table.snapshot(secondCommitId).addedDataFiles(table.io())))
             .set("position_delete_record_count", 0L)
             .set("position_delete_file_count", 0)
             .set("equality_delete_record_count", 0L)
@@ -1476,12 +1492,17 @@ public abstract class TestIcebergSourceTablesBase 
extends SparkTestBase {
         new GenericRecordBuilder(
             AvroSchemaUtil.convert(
                 partitionsTable.schema().findType("partition").asStructType(), 
"partition"));
+
+    List<DataFile> dataFiles = dataFiles(table);
+    assertDataFilePartitions(dataFiles, Arrays.asList(1, 2, 2));
+
     List<GenericData.Record> expected = Lists.newArrayList();
     expected.add(
         builder
             .set("partition", partitionBuilder.set("id", 1).build())
             .set("record_count", 1L)
             .set("file_count", 1)
+            .set("total_data_file_size_in_bytes", 
dataFiles.get(0).fileSizeInBytes())
             .set("position_delete_record_count", 0L)
             .set("position_delete_file_count", 0)
             .set("equality_delete_record_count", 0L)
@@ -1495,6 +1516,9 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
             .set("partition", partitionBuilder.set("id", 2).build())
             .set("record_count", 2L)
             .set("file_count", 2)
+            .set(
+                "total_data_file_size_in_bytes",
+                dataFiles.get(1).fileSizeInBytes() + 
dataFiles.get(2).fileSizeInBytes())
             .set("position_delete_record_count", 0L)
             .set("position_delete_file_count", 0)
             .set("equality_delete_record_count", 0L)
@@ -1532,6 +1556,7 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
             .set("partition", partitionBuilder.set("id", 1).build())
             .set("record_count", 1L)
             .set("file_count", 1)
+            .set("total_data_file_size_in_bytes", 
dataFiles.get(0).fileSizeInBytes())
             .set("position_delete_record_count", 0L)
             .set("position_delete_file_count", 0)
             .set("equality_delete_record_count", 0L)
@@ -1613,6 +1638,9 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
             .set("partition", partitionBuilder.set("id", 1).build())
             .set("record_count", 1L)
             .set("file_count", 1)
+            .set(
+                "total_data_file_size_in_bytes",
+                
totalSizeInBytes(table.snapshot(firstCommitId).addedDataFiles(table.io())))
             .set("position_delete_record_count", 0L)
             .set("position_delete_file_count", 0)
             .set("equality_delete_record_count", 0L)
@@ -1626,6 +1654,9 @@ public abstract class TestIcebergSourceTablesBase extends 
SparkTestBase {
             .set("partition", partitionBuilder.set("id", 2).build())
             .set("record_count", 1L)
             .set("file_count", 1)
+            .set(
+                "total_data_file_size_in_bytes",
+                
totalSizeInBytes(table.snapshot(firstCommitId).addedDataFiles(table.io())))
             .set("position_delete_record_count", 1L) // should be incremented 
now
             .set("position_delete_file_count", 1) // should be incremented now
             .set("equality_delete_record_count", 0L)
@@ -2223,4 +2254,28 @@ public abstract class TestIcebergSourceTablesBase 
extends SparkTestBase {
       throw new RuntimeException(e);
     }
   }
+
+  private long totalSizeInBytes(Iterable<DataFile> dataFiles) {
+    return 
Lists.newArrayList(dataFiles).stream().mapToLong(DataFile::fileSizeInBytes).sum();
+  }
+
+  private List<DataFile> dataFiles(Table table) {
+    CloseableIterable<FileScanTask> tasks = table.newScan().planFiles();
+    return Lists.newArrayList(CloseableIterable.transform(tasks, 
FileScanTask::file));
+  }
+
+  private void assertDataFilePartitions(
+      List<DataFile> dataFiles, List<Integer> expectedPartitionIds) {
+    Assert.assertEquals(
+        "Table should have " + expectedPartitionIds.size() + " data files",
+        expectedPartitionIds.size(),
+        dataFiles.size());
+
+    for (int i = 0; i < dataFiles.size(); ++i) {
+      Assert.assertEquals(
+          "Data file should have partition of id " + 
expectedPartitionIds.get(i),
+          expectedPartitionIds.get(i).intValue(),
+          dataFiles.get(i).partition().get(0, Integer.class).intValue());
+    }
+  }
 }

Reply via email to