This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 7f81e1e930 Core: Use scan API to read partition stats (#14989)
7f81e1e930 is described below

commit 7f81e1e93084e50fa3676c2e131722f66a26b385
Author: gaborkaszab <[email protected]>
AuthorDate: Mon Jan 12 14:14:03 2026 +0100

    Core: Use scan API to read partition stats (#14989)
---
 .../org/apache/iceberg/PartitionStatistics.java    |  15 ++
 .../apache/iceberg/BasePartitionStatistics.java    |  70 +++---
 .../org/apache/iceberg/PartitionStatsHandler.java  | 252 +++++++++++++++++----
 .../iceberg/PartitionStatisticsTestBase.java       |  26 +--
 .../iceberg/PartitionStatsHandlerTestBase.java     |  67 ++++--
 5 files changed, 325 insertions(+), 105 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/PartitionStatistics.java 
b/api/src/main/java/org/apache/iceberg/PartitionStatistics.java
index 10df7303d5..c0c4c07b7e 100644
--- a/api/src/main/java/org/apache/iceberg/PartitionStatistics.java
+++ b/api/src/main/java/org/apache/iceberg/PartitionStatistics.java
@@ -21,6 +21,21 @@ package org.apache.iceberg;
 /** Interface for partition statistics returned from a {@link 
PartitionStatisticsScan}. */
 public interface PartitionStatistics extends StructLike {
 
+  /* The positions of each statistics within the full schema of partition 
statistics. */
+  int PARTITION_POSITION = 0;
+  int SPEC_ID_POSITION = 1;
+  int DATA_RECORD_COUNT_POSITION = 2;
+  int DATA_FILE_COUNT_POSITION = 3;
+  int TOTAL_DATA_FILE_SIZE_IN_BYTES_POSITION = 4;
+  int POSITION_DELETE_RECORD_COUNT_POSITION = 5;
+  int POSITION_DELETE_FILE_COUNT_POSITION = 6;
+  int EQUALITY_DELETE_RECORD_COUNT_POSITION = 7;
+  int EQUALITY_DELETE_FILE_COUNT_POSITION = 8;
+  int TOTAL_RECORD_COUNT_POSITION = 9;
+  int LAST_UPDATED_AT_POSITION = 10;
+  int LAST_UPDATED_SNAPSHOT_ID_POSITION = 11;
+  int DV_COUNT_POSITION = 12;
+
   /** Returns the partition of these partition statistics */
   StructLike partition();
 
diff --git a/core/src/main/java/org/apache/iceberg/BasePartitionStatistics.java 
b/core/src/main/java/org/apache/iceberg/BasePartitionStatistics.java
index c17718281b..4b1a3a6dba 100644
--- a/core/src/main/java/org/apache/iceberg/BasePartitionStatistics.java
+++ b/core/src/main/java/org/apache/iceberg/BasePartitionStatistics.java
@@ -40,8 +40,24 @@ public class BasePartitionStatistics extends 
SupportsIndexProjection
 
   private static final int STATS_COUNT = 13;
 
+  BasePartitionStatistics(StructLike partition, int specId) {
+    super(STATS_COUNT);
+
+    this.partition = partition;
+    this.specId = specId;
+
+    this.dataRecordCount = 0L;
+    this.dataFileCount = 0;
+    this.totalDataFileSizeInBytes = 0L;
+    this.positionDeleteRecordCount = 0L;
+    this.positionDeleteFileCount = 0;
+    this.equalityDeleteRecordCount = 0L;
+    this.equalityDeleteFileCount = 0;
+    this.dvCount = 0;
+  }
+
   /** Used by internal readers to instantiate this class with a projection 
schema. */
-  public BasePartitionStatistics(Types.StructType projection) {
+  BasePartitionStatistics(Types.StructType projection) {
     super(STATS_COUNT);
   }
 
@@ -117,31 +133,31 @@ public class BasePartitionStatistics extends 
SupportsIndexProjection
 
   private Object getByPos(int pos) {
     switch (pos) {
-      case 0:
+      case PARTITION_POSITION:
         return partition;
-      case 1:
+      case SPEC_ID_POSITION:
         return specId;
-      case 2:
+      case DATA_RECORD_COUNT_POSITION:
         return dataRecordCount;
-      case 3:
+      case DATA_FILE_COUNT_POSITION:
         return dataFileCount;
-      case 4:
+      case TOTAL_DATA_FILE_SIZE_IN_BYTES_POSITION:
         return totalDataFileSizeInBytes;
-      case 5:
+      case POSITION_DELETE_RECORD_COUNT_POSITION:
         return positionDeleteRecordCount;
-      case 6:
+      case POSITION_DELETE_FILE_COUNT_POSITION:
         return positionDeleteFileCount;
-      case 7:
+      case EQUALITY_DELETE_RECORD_COUNT_POSITION:
         return equalityDeleteRecordCount;
-      case 8:
+      case EQUALITY_DELETE_FILE_COUNT_POSITION:
         return equalityDeleteFileCount;
-      case 9:
+      case TOTAL_RECORD_COUNT_POSITION:
         return totalRecordCount;
-      case 10:
+      case LAST_UPDATED_AT_POSITION:
         return lastUpdatedAt;
-      case 11:
+      case LAST_UPDATED_SNAPSHOT_ID_POSITION:
         return lastUpdatedSnapshotId;
-      case 12:
+      case DV_COUNT_POSITION:
         return dvCount;
       default:
         throw new UnsupportedOperationException("Unknown position: " + pos);
@@ -155,43 +171,43 @@ public class BasePartitionStatistics extends 
SupportsIndexProjection
     }
 
     switch (pos) {
-      case 0:
+      case PARTITION_POSITION:
         this.partition = (StructLike) value;
         break;
-      case 1:
+      case SPEC_ID_POSITION:
         this.specId = (int) value;
         break;
-      case 2:
+      case DATA_RECORD_COUNT_POSITION:
         this.dataRecordCount = (long) value;
         break;
-      case 3:
+      case DATA_FILE_COUNT_POSITION:
         this.dataFileCount = (int) value;
         break;
-      case 4:
+      case TOTAL_DATA_FILE_SIZE_IN_BYTES_POSITION:
         this.totalDataFileSizeInBytes = (long) value;
         break;
-      case 5:
+      case POSITION_DELETE_RECORD_COUNT_POSITION:
         this.positionDeleteRecordCount = (long) value;
         break;
-      case 6:
+      case POSITION_DELETE_FILE_COUNT_POSITION:
         this.positionDeleteFileCount = (int) value;
         break;
-      case 7:
+      case EQUALITY_DELETE_RECORD_COUNT_POSITION:
         this.equalityDeleteRecordCount = (long) value;
         break;
-      case 8:
+      case EQUALITY_DELETE_FILE_COUNT_POSITION:
         this.equalityDeleteFileCount = (int) value;
         break;
-      case 9:
+      case TOTAL_RECORD_COUNT_POSITION:
         this.totalRecordCount = (Long) value;
         break;
-      case 10:
+      case LAST_UPDATED_AT_POSITION:
         this.lastUpdatedAt = (Long) value;
         break;
-      case 11:
+      case LAST_UPDATED_SNAPSHOT_ID_POSITION:
         this.lastUpdatedSnapshotId = (Long) value;
         break;
-      case 12:
+      case DV_COUNT_POSITION:
         this.dvCount = (int) value;
         break;
       default:
diff --git a/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java 
b/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java
index 7259a1f068..acf4ca3c0c 100644
--- a/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java
+++ b/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java
@@ -208,9 +208,7 @@ public class PartitionStatsHandler {
     Snapshot snapshot = table.snapshot(snapshotId);
     Preconditions.checkArgument(snapshot != null, "Snapshot not found: %s", 
snapshotId);
 
-    StructType partitionType = Partitioning.partitionType(table);
-
-    Collection<PartitionStats> stats;
+    Collection<PartitionStatistics> stats;
     PartitionStatisticsFile statisticsFile = latestStatsFile(table, 
snapshot.snapshotId());
     if (statisticsFile == null) {
       LOG.info(
@@ -225,7 +223,7 @@ public class PartitionStatsHandler {
       }
 
       try {
-        stats = computeAndMergeStatsIncremental(table, snapshot, 
partitionType, statisticsFile);
+        stats = computeAndMergeStatsIncremental(table, snapshot, 
statisticsFile.snapshotId());
       } catch (InvalidStatsFileException exception) {
         LOG.warn(
             "Using full compute as previous statistics file is corrupted for 
incremental compute.");
@@ -240,7 +238,8 @@ public class PartitionStatsHandler {
       return null;
     }
 
-    List<PartitionStats> sortedStats = sortStatsByPartition(stats, 
partitionType);
+    StructType partitionType = Partitioning.partitionType(table);
+    List<PartitionStatistics> sortedStats = sortStatsByPartition(stats, 
partitionType);
     return writePartitionStatsFile(
         table,
         snapshot.snapshotId(),
@@ -250,7 +249,7 @@ public class PartitionStatsHandler {
 
   @VisibleForTesting
   static PartitionStatisticsFile writePartitionStatsFile(
-      Table table, long snapshotId, Schema dataSchema, 
Iterable<PartitionStats> records)
+      Table table, long snapshotId, Schema dataSchema, 
Iterable<PartitionStatistics> records)
       throws IOException {
     FileFormat fileFormat =
         FileFormat.fromString(
@@ -322,17 +321,11 @@ public class PartitionStatsHandler {
     return stats;
   }
 
-  private static Collection<PartitionStats> computeAndMergeStatsIncremental(
-      Table table,
-      Snapshot snapshot,
-      StructType partitionType,
-      PartitionStatisticsFile previousStatsFile) {
-    PartitionMap<PartitionStats> statsMap = PartitionMap.create(table.specs());
-    // read previous stats, note that partition field will be read as 
GenericRecord
-    try (CloseableIterable<PartitionStats> oldStats =
-        readPartitionStatsFile(
-            schema(partitionType, TableUtil.formatVersion(table)),
-            table.io().newInputFile(previousStatsFile.path()))) {
+  private static Collection<PartitionStatistics> 
computeAndMergeStatsIncremental(
+      Table table, Snapshot snapshot, long lastSnapshotWithStats) {
+    PartitionMap<PartitionStatistics> statsMap = 
PartitionMap.create(table.specs());
+    try (CloseableIterable<PartitionStatistics> oldStats =
+        
table.newPartitionStatisticsScan().useSnapshot(lastSnapshotWithStats).scan()) {
       oldStats.forEach(
           partitionStats ->
               statsMap.put(partitionStats.specId(), 
partitionStats.partition(), partitionStats));
@@ -341,8 +334,8 @@ public class PartitionStatsHandler {
     }
 
     // incrementally compute the new stats, partition field will be written as 
PartitionData
-    PartitionMap<PartitionStats> incrementalStatsMap =
-        computeStatsDiff(table, 
table.snapshot(previousStatsFile.snapshotId()), snapshot);
+    PartitionMap<PartitionStatistics> incrementalStatsMap =
+        computeStatsDiff(table, table.snapshot(lastSnapshotWithStats), 
snapshot);
 
     // convert PartitionData into GenericRecord and merge stats
     incrementalStatsMap.forEach(
@@ -351,7 +344,7 @@ public class PartitionStatsHandler {
                 Pair.of(key.first(), partitionDataToRecord((PartitionData) 
key.second())),
                 value,
                 (existingEntry, newEntry) -> {
-                  existingEntry.appendStats(newEntry);
+                  appendStats(existingEntry, newEntry);
                   return existingEntry;
                 }));
 
@@ -389,7 +382,7 @@ public class PartitionStatsHandler {
     return null;
   }
 
-  private static PartitionMap<PartitionStats> computeStatsDiff(
+  private static PartitionMap<PartitionStatistics> computeStatsDiff(
       Table table, Snapshot fromSnapshot, Snapshot toSnapshot) {
     Iterable<Snapshot> snapshots =
         SnapshotUtil.ancestorsBetween(
@@ -408,10 +401,10 @@ public class PartitionStatsHandler {
     return computeStats(table, manifests, true /* incremental */);
   }
 
-  private static PartitionMap<PartitionStats> computeStats(
+  private static PartitionMap<PartitionStatistics> computeStats(
       Table table, List<ManifestFile> manifests, boolean incremental) {
     StructType partitionType = Partitioning.partitionType(table);
-    Queue<PartitionMap<PartitionStats>> statsByManifest = 
Queues.newConcurrentLinkedQueue();
+    Queue<PartitionMap<PartitionStatistics>> statsByManifest = 
Queues.newConcurrentLinkedQueue();
     Tasks.foreach(manifests)
         .stopOnFailure()
         .throwFailureWhenFinished()
@@ -421,19 +414,19 @@ public class PartitionStatsHandler {
                 statsByManifest.add(
                     collectStatsForManifest(table, manifest, partitionType, 
incremental)));
 
-    PartitionMap<PartitionStats> statsMap = PartitionMap.create(table.specs());
-    for (PartitionMap<PartitionStats> stats : statsByManifest) {
+    PartitionMap<PartitionStatistics> statsMap = 
PartitionMap.create(table.specs());
+    for (PartitionMap<PartitionStatistics> stats : statsByManifest) {
       mergePartitionMap(stats, statsMap);
     }
 
     return statsMap;
   }
 
-  private static PartitionMap<PartitionStats> collectStatsForManifest(
+  private static PartitionMap<PartitionStatistics> collectStatsForManifest(
       Table table, ManifestFile manifest, StructType partitionType, boolean 
incremental) {
     List<String> projection = BaseScan.scanColumns(manifest.content());
     try (ManifestReader<?> reader = ManifestFiles.open(manifest, 
table.io()).select(projection)) {
-      PartitionMap<PartitionStats> statsMap = 
PartitionMap.create(table.specs());
+      PartitionMap<PartitionStatistics> statsMap = 
PartitionMap.create(table.specs());
       int specId = manifest.partitionSpecId();
       PartitionSpec spec = table.specs().get(specId);
       PartitionData keyTemplate = new PartitionData(partitionType);
@@ -444,22 +437,22 @@ public class PartitionStatsHandler {
             PartitionUtil.coercePartition(partitionType, spec, 
file.partition());
         StructLike key = keyTemplate.copyFor(coercedPartition);
         Snapshot snapshot = table.snapshot(entry.snapshotId());
-        PartitionStats stats =
+        PartitionStatistics stats =
             statsMap.computeIfAbsent(
                 specId,
                 ((PartitionData) file.partition()).copy(),
-                () -> new PartitionStats(key, specId));
+                () -> new BasePartitionStatistics(key, specId));
         if (entry.isLive()) {
           // Live can have both added and existing entries. Consider only 
added entries for
           // incremental compute as existing entries was already included in 
previous compute.
           if (!incremental || entry.status() == ManifestEntry.Status.ADDED) {
-            stats.liveEntry(file, snapshot);
+            liveEntry(stats, file, snapshot);
           }
         } else {
           if (incremental) {
-            stats.deletedEntryForIncrementalCompute(file, snapshot);
+            deletedEntryForIncrementalCompute(stats, file, snapshot);
           } else {
-            stats.deletedEntry(snapshot);
+            deletedEntry(stats, snapshot);
           }
         }
       }
@@ -471,26 +464,209 @@ public class PartitionStatsHandler {
   }
 
   private static void mergePartitionMap(
-      PartitionMap<PartitionStats> fromMap, PartitionMap<PartitionStats> 
toMap) {
+      PartitionMap<PartitionStatistics> fromMap, 
PartitionMap<PartitionStatistics> toMap) {
     fromMap.forEach(
         (key, value) ->
             toMap.merge(
                 key,
                 value,
                 (existingEntry, newEntry) -> {
-                  existingEntry.appendStats(newEntry);
+                  appendStats(existingEntry, newEntry);
                   return existingEntry;
                 }));
   }
 
-  private static List<PartitionStats> sortStatsByPartition(
-      Collection<PartitionStats> stats, StructType partitionType) {
-    List<PartitionStats> entries = Lists.newArrayList(stats);
+  private static List<PartitionStatistics> sortStatsByPartition(
+      Collection<PartitionStatistics> stats, StructType partitionType) {
+    List<PartitionStatistics> entries = Lists.newArrayList(stats);
     entries.sort(
-        Comparator.comparing(PartitionStats::partition, 
Comparators.forType(partitionType)));
+        Comparator.comparing(PartitionStatistics::partition, 
Comparators.forType(partitionType)));
     return entries;
   }
 
+  /**
+   * Updates the partition stats from the data/delete file.
+   *
+   * @param stats partition statistics to be updated.
+   * @param file the {@link ContentFile} from the manifest entry.
+   * @param snapshot the snapshot corresponding to the live entry.
+   */
+  private static void liveEntry(PartitionStatistics stats, ContentFile<?> 
file, Snapshot snapshot) {
+    Preconditions.checkArgument(stats.specId() == file.specId(), "Spec IDs 
must match");
+
+    switch (file.content()) {
+      case DATA:
+        stats.set(
+            PartitionStatistics.DATA_RECORD_COUNT_POSITION,
+            stats.dataRecordCount() + file.recordCount());
+        stats.set(PartitionStatistics.DATA_FILE_COUNT_POSITION, 
stats.dataFileCount() + 1);
+        stats.set(
+            PartitionStatistics.TOTAL_DATA_FILE_SIZE_IN_BYTES_POSITION,
+            stats.totalDataFileSizeInBytes() + file.fileSizeInBytes());
+        break;
+      case POSITION_DELETES:
+        stats.set(
+            PartitionStatistics.POSITION_DELETE_RECORD_COUNT_POSITION,
+            stats.positionDeleteRecordCount() + file.recordCount());
+        if (file.format() == FileFormat.PUFFIN) {
+          stats.set(PartitionStatistics.DV_COUNT_POSITION, stats.dvCount() + 
1);
+        } else {
+          stats.set(
+              PartitionStatistics.POSITION_DELETE_FILE_COUNT_POSITION,
+              stats.positionDeleteFileCount() + 1);
+        }
+
+        break;
+      case EQUALITY_DELETES:
+        stats.set(
+            PartitionStatistics.EQUALITY_DELETE_RECORD_COUNT_POSITION,
+            stats.equalityDeleteRecordCount() + file.recordCount());
+        stats.set(
+            PartitionStatistics.EQUALITY_DELETE_FILE_COUNT_POSITION,
+            stats.equalityDeleteFileCount() + 1);
+        break;
+      default:
+        throw new UnsupportedOperationException("Unsupported file content 
type: " + file.content());
+    }
+
+    if (snapshot != null) {
+      updateSnapshotInfo(stats, snapshot.snapshotId(), 
snapshot.timestampMillis());
+    }
+
+    // Note: Not computing the `TOTAL_RECORD_COUNT` for now as it needs 
scanning the data.
+  }
+
+  /**
+   * Updates the modified time and snapshot ID in stats for the deleted 
manifest entry.
+   *
+   * @param stats partition statistics to be updated.
+   * @param snapshot the snapshot corresponding to the deleted manifest entry.
+   */
+  private static void deletedEntry(PartitionStatistics stats, Snapshot 
snapshot) {
+    if (snapshot != null) {
+      updateSnapshotInfo(stats, snapshot.snapshotId(), 
snapshot.timestampMillis());
+    }
+  }
+
+  /**
+   * Decrement the counters in stats as it was included in the previous stats 
and updates the
+   * modified time and snapshot ID for the deleted manifest entry.
+   *
+   * @param stats partition statistics to be updated.
+   * @param snapshot the snapshot corresponding to the deleted manifest entry.
+   */
+  private static void deletedEntryForIncrementalCompute(
+      PartitionStatistics stats, ContentFile<?> file, Snapshot snapshot) {
+    Preconditions.checkArgument(stats.specId() == file.specId(), "Spec IDs 
must match");
+
+    switch (file.content()) {
+      case DATA:
+        stats.set(
+            PartitionStatistics.DATA_RECORD_COUNT_POSITION,
+            stats.dataRecordCount() - file.recordCount());
+        stats.set(PartitionStatistics.DATA_FILE_COUNT_POSITION, 
stats.dataFileCount() - 1);
+        stats.set(
+            PartitionStatistics.TOTAL_DATA_FILE_SIZE_IN_BYTES_POSITION,
+            stats.totalDataFileSizeInBytes() - file.fileSizeInBytes());
+        break;
+      case POSITION_DELETES:
+        stats.set(
+            PartitionStatistics.POSITION_DELETE_RECORD_COUNT_POSITION,
+            stats.positionDeleteRecordCount() - file.recordCount());
+        if (file.format() == FileFormat.PUFFIN) {
+          stats.set(PartitionStatistics.DV_COUNT_POSITION, stats.dvCount() - 
1);
+        } else {
+          stats.set(
+              PartitionStatistics.POSITION_DELETE_FILE_COUNT_POSITION,
+              stats.positionDeleteFileCount() - 1);
+        }
+
+        break;
+      case EQUALITY_DELETES:
+        stats.set(
+            PartitionStatistics.EQUALITY_DELETE_RECORD_COUNT_POSITION,
+            stats.equalityDeleteRecordCount() - file.recordCount());
+        stats.set(
+            PartitionStatistics.EQUALITY_DELETE_FILE_COUNT_POSITION,
+            stats.equalityDeleteFileCount() - 1);
+        break;
+      default:
+        throw new UnsupportedOperationException("Unsupported file content 
type: " + file.content());
+    }
+
+    if (snapshot != null) {
+      updateSnapshotInfo(stats, snapshot.snapshotId(), 
snapshot.timestampMillis());
+    }
+  }
+
+  /**
+   * Appends statistics from given entry to another entry.
+   *
+   * @param targetStats partition statistics to be updated.
+   * @param inputStats the partition statistics used as input.
+   */
+  private static void appendStats(PartitionStatistics targetStats, 
PartitionStatistics inputStats) {
+    Preconditions.checkArgument(targetStats.specId() != null, "Invalid spec 
ID: null");
+    Preconditions.checkArgument(
+        targetStats.specId().equals(inputStats.specId()), "Spec IDs must 
match");
+
+    // This is expected to be called on the compute/write path where we use 
full schemas, hence
+    // these members can't be null.
+    targetStats.set(
+        PartitionStatistics.DATA_RECORD_COUNT_POSITION,
+        targetStats.dataRecordCount() + inputStats.dataRecordCount());
+    targetStats.set(
+        PartitionStatistics.DATA_FILE_COUNT_POSITION,
+        targetStats.dataFileCount() + inputStats.dataFileCount());
+    targetStats.set(
+        PartitionStatistics.TOTAL_DATA_FILE_SIZE_IN_BYTES_POSITION,
+        targetStats.totalDataFileSizeInBytes() + 
inputStats.totalDataFileSizeInBytes());
+    targetStats.set(
+        PartitionStatistics.POSITION_DELETE_RECORD_COUNT_POSITION,
+        targetStats.positionDeleteRecordCount() + 
inputStats.positionDeleteRecordCount());
+    targetStats.set(
+        PartitionStatistics.POSITION_DELETE_FILE_COUNT_POSITION,
+        targetStats.positionDeleteFileCount() + 
inputStats.positionDeleteFileCount());
+    targetStats.set(
+        PartitionStatistics.EQUALITY_DELETE_RECORD_COUNT_POSITION,
+        targetStats.equalityDeleteRecordCount() + 
inputStats.equalityDeleteRecordCount());
+    targetStats.set(
+        PartitionStatistics.EQUALITY_DELETE_FILE_COUNT_POSITION,
+        targetStats.equalityDeleteFileCount() + 
inputStats.equalityDeleteFileCount());
+
+    if (inputStats.dvCount() != null) {
+      if (targetStats.dvCount() == null) {
+        targetStats.set(PartitionStatistics.DV_COUNT_POSITION, 
inputStats.dvCount());
+      } else {
+        targetStats.set(
+            PartitionStatistics.DV_COUNT_POSITION, targetStats.dvCount() + 
inputStats.dvCount());
+      }
+    }
+
+    if (inputStats.totalRecords() != null) {
+      if (targetStats.totalRecords() == null) {
+        targetStats.set(PartitionStatistics.TOTAL_RECORD_COUNT_POSITION, 
inputStats.totalRecords());
+      } else {
+        targetStats.set(
+            PartitionStatistics.TOTAL_RECORD_COUNT_POSITION,
+            targetStats.totalRecords() + inputStats.totalRecords());
+      }
+    }
+
+    if (inputStats.lastUpdatedAt() != null) {
+      updateSnapshotInfo(
+          targetStats, inputStats.lastUpdatedSnapshotId(), 
inputStats.lastUpdatedAt());
+    }
+  }
+
+  private static void updateSnapshotInfo(
+      PartitionStatistics stats, long snapshotId, long updatedAt) {
+    if (stats.lastUpdatedAt() == null || stats.lastUpdatedAt() < updatedAt) {
+      stats.set(PartitionStatistics.LAST_UPDATED_AT_POSITION, updatedAt);
+      stats.set(PartitionStatistics.LAST_UPDATED_SNAPSHOT_ID_POSITION, 
snapshotId);
+    }
+  }
+
   private static class InvalidStatsFileException extends RuntimeException {
 
     InvalidStatsFileException(Throwable cause) {
diff --git 
a/core/src/test/java/org/apache/iceberg/PartitionStatisticsTestBase.java 
b/core/src/test/java/org/apache/iceberg/PartitionStatisticsTestBase.java
index 72a5405d7f..f6326e228c 100644
--- a/core/src/test/java/org/apache/iceberg/PartitionStatisticsTestBase.java
+++ b/core/src/test/java/org/apache/iceberg/PartitionStatisticsTestBase.java
@@ -44,19 +44,6 @@ public abstract class PartitionStatisticsTestBase {
 
   @TempDir private File temp;
 
-  // positions in StructLike
-  protected static final int DATA_RECORD_COUNT_POSITION = 2;
-  protected static final int DATA_FILE_COUNT_POSITION = 3;
-  protected static final int TOTAL_DATA_FILE_SIZE_IN_BYTES_POSITION = 4;
-  protected static final int POSITION_DELETE_RECORD_COUNT_POSITION = 5;
-  protected static final int POSITION_DELETE_FILE_COUNT_POSITION = 6;
-  protected static final int EQUALITY_DELETE_RECORD_COUNT_POSITION = 7;
-  protected static final int EQUALITY_DELETE_FILE_COUNT_POSITION = 8;
-  protected static final int TOTAL_RECORD_COUNT_POSITION = 9;
-  protected static final int LAST_UPDATED_AT_POSITION = 10;
-  protected static final int LAST_UPDATED_SNAPSHOT_ID_POSITION = 11;
-  protected static final int DV_COUNT_POSITION = 12;
-
   protected static final Schema SCHEMA =
       new Schema(
           optional(1, "c1", Types.IntegerType.get()),
@@ -85,18 +72,19 @@ public abstract class PartitionStatisticsTestBase {
         Types.NestedField.optional(11, LAST_UPDATED_SNAPSHOT_ID.name(), 
Types.LongType.get()));
   }
 
-  protected PartitionStats randomStats(Types.StructType partitionType) {
+  protected PartitionStatistics randomStats(Types.StructType partitionType) {
     PartitionData partitionData = new PartitionData(partitionType);
     partitionData.set(0, RANDOM.nextInt());
 
     return randomStats(partitionData);
   }
 
-  protected PartitionStats randomStats(PartitionData partitionData) {
-    PartitionStats stats = new PartitionStats(partitionData, 
RANDOM.nextInt(10));
-    stats.set(DATA_RECORD_COUNT_POSITION, RANDOM.nextLong());
-    stats.set(DATA_FILE_COUNT_POSITION, RANDOM.nextInt());
-    stats.set(TOTAL_DATA_FILE_SIZE_IN_BYTES_POSITION, 1024L * 
RANDOM.nextInt(20));
+  protected PartitionStatistics randomStats(PartitionData partitionData) {
+    PartitionStatistics stats = new BasePartitionStatistics(partitionData, 
RANDOM.nextInt(10));
+    stats.set(PartitionStatistics.DATA_RECORD_COUNT_POSITION, 
RANDOM.nextLong());
+    stats.set(PartitionStatistics.DATA_FILE_COUNT_POSITION, RANDOM.nextInt());
+    stats.set(
+        PartitionStatistics.TOTAL_DATA_FILE_SIZE_IN_BYTES_POSITION, 1024L * 
RANDOM.nextInt(20));
     return stats;
   }
 
diff --git 
a/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java 
b/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java
index 9b93013a9b..fd1e5ffe29 100644
--- a/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java
+++ b/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java
@@ -180,8 +180,8 @@ public abstract class PartitionStatsHandlerTestBase extends 
PartitionStatisticsT
     partitionData.set(13, new BigDecimal("12345678901234567890.1234567890"));
     partitionData.set(14, 
Literal.of("10:10:10").to(Types.TimeType.get()).value());
 
-    PartitionStats partitionStats = randomStats(partitionData);
-    List<PartitionStats> expected = Collections.singletonList(partitionStats);
+    PartitionStatistics partitionStats = randomStats(partitionData);
+    List<PartitionStatistics> expected = 
Collections.singletonList(partitionStats);
     PartitionStatisticsFile statisticsFile =
         PartitionStatsHandler.writePartitionStatsFile(testTable, 42L, 
dataSchema, expected);
 
@@ -214,34 +214,34 @@ public abstract class PartitionStatsHandlerTestBase 
extends PartitionStatisticsT
     Types.StructType partitionSchema = Partitioning.partitionType(testTable);
     Schema dataSchema = PartitionStatsHandler.schema(partitionSchema, 
formatVersion);
 
-    ImmutableList.Builder<PartitionStats> partitionListBuilder = 
ImmutableList.builder();
+    ImmutableList.Builder<PartitionStatistics> partitionListBuilder = 
ImmutableList.builder();
     for (int i = 0; i < 5; i++) {
-      PartitionStats stats =
+      PartitionStatistics stats =
           
randomStats(dataSchema.findField(PARTITION_FIELD_ID).type().asStructType());
-      stats.set(POSITION_DELETE_RECORD_COUNT_POSITION, null);
-      stats.set(POSITION_DELETE_FILE_COUNT_POSITION, null);
-      stats.set(EQUALITY_DELETE_RECORD_COUNT_POSITION, null);
-      stats.set(EQUALITY_DELETE_FILE_COUNT_POSITION, null);
-      stats.set(TOTAL_RECORD_COUNT_POSITION, null);
-      stats.set(LAST_UPDATED_AT_POSITION, null);
-      stats.set(LAST_UPDATED_SNAPSHOT_ID_POSITION, null);
-      stats.set(DV_COUNT_POSITION, null);
+      stats.set(PartitionStatistics.POSITION_DELETE_RECORD_COUNT_POSITION, 
null);
+      stats.set(PartitionStatistics.POSITION_DELETE_FILE_COUNT_POSITION, null);
+      stats.set(PartitionStatistics.EQUALITY_DELETE_RECORD_COUNT_POSITION, 
null);
+      stats.set(PartitionStatistics.EQUALITY_DELETE_FILE_COUNT_POSITION, null);
+      stats.set(PartitionStatistics.TOTAL_RECORD_COUNT_POSITION, null);
+      stats.set(PartitionStatistics.LAST_UPDATED_AT_POSITION, null);
+      stats.set(PartitionStatistics.LAST_UPDATED_SNAPSHOT_ID_POSITION, null);
+      stats.set(PartitionStatistics.DV_COUNT_POSITION, null);
 
       partitionListBuilder.add(stats);
     }
 
-    List<PartitionStats> expected = partitionListBuilder.build();
+    List<PartitionStatistics> expected = partitionListBuilder.build();
 
     assertThat(expected.get(0))
         .extracting(
-            PartitionStats::positionDeleteRecordCount,
-            PartitionStats::positionDeleteFileCount,
-            PartitionStats::equalityDeleteRecordCount,
-            PartitionStats::equalityDeleteFileCount,
-            PartitionStats::totalRecords,
-            PartitionStats::lastUpdatedAt,
-            PartitionStats::lastUpdatedSnapshotId,
-            PartitionStats::dvCount)
+            PartitionStatistics::positionDeleteRecordCount,
+            PartitionStatistics::positionDeleteFileCount,
+            PartitionStatistics::equalityDeleteRecordCount,
+            PartitionStatistics::equalityDeleteFileCount,
+            PartitionStatistics::totalRecords,
+            PartitionStatistics::lastUpdatedAt,
+            PartitionStatistics::lastUpdatedSnapshotId,
+            PartitionStatistics::dvCount)
         .isEqualTo(
             Arrays.asList(
                 0L, 0, 0L, 0, null, null, null, 0)); // null counters must be 
initialized to zero.
@@ -734,4 +734,29 @@ public abstract class PartitionStatsHandlerTestBase 
extends PartitionStatisticsT
         && Objects.equals(stats1.lastUpdatedAt(), stats2.lastUpdatedAt())
         && Objects.equals(stats1.lastUpdatedSnapshotId(), 
stats2.lastUpdatedSnapshotId());
   }
+
+  @SuppressWarnings("checkstyle:CyclomaticComplexity")
+  private static boolean isEqual(
+      Comparator<StructLike> partitionComparator,
+      PartitionStats stats1,
+      PartitionStatistics stats2) {
+    if (stats1 == stats2) {
+      return true;
+    } else if (stats1 == null || stats2 == null) {
+      return false;
+    }
+
+    return partitionComparator.compare(stats1.partition(), stats2.partition()) 
== 0
+        && stats1.specId() == stats2.specId()
+        && stats1.dataRecordCount() == stats2.dataRecordCount()
+        && stats1.dataFileCount() == stats2.dataFileCount()
+        && stats1.totalDataFileSizeInBytes() == 
stats2.totalDataFileSizeInBytes()
+        && stats1.positionDeleteRecordCount() == 
stats2.positionDeleteRecordCount()
+        && stats1.positionDeleteFileCount() == stats2.positionDeleteFileCount()
+        && stats1.equalityDeleteRecordCount() == 
stats2.equalityDeleteRecordCount()
+        && stats1.equalityDeleteFileCount() == stats2.equalityDeleteFileCount()
+        && Objects.equals(stats1.totalRecords(), stats2.totalRecords())
+        && Objects.equals(stats1.lastUpdatedAt(), stats2.lastUpdatedAt())
+        && Objects.equals(stats1.lastUpdatedSnapshotId(), 
stats2.lastUpdatedSnapshotId());
+  }
 }

Reply via email to