This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 2ada622509 Core: Fix incremental compute of partition stats for 
various edge cases (#13163)
2ada622509 is described below

commit 2ada622509ad1e57c9df70016875c8bbf119cb95
Author: Ajantha Bhat <[email protected]>
AuthorDate: Thu May 29 16:50:49 2025 +0530

    Core: Fix incremental compute of partition stats for various edge cases 
(#13163)
---
 .../org/apache/iceberg/PartitionStatsHandler.java  | 44 ++++++-----
 .../iceberg/PartitionStatsHandlerTestBase.java     | 92 +++++++++++++++++++++-
 .../iceberg/TestOrcPartitionStatsHandler.java      | 14 ++++
 3 files changed, 128 insertions(+), 22 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java 
b/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java
index aeb6b8b4c7..3fb2e790a2 100644
--- a/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java
+++ b/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java
@@ -29,9 +29,9 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Queue;
-import java.util.Set;
 import java.util.UUID;
 import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 import org.apache.iceberg.data.GenericRecord;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileAppender;
@@ -39,10 +39,8 @@ import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.io.OutputFile;
 import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.base.Predicate;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Queues;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.Comparators;
 import org.apache.iceberg.types.Types.IntegerType;
 import org.apache.iceberg.types.Types.LongType;
@@ -163,7 +161,8 @@ public class PartitionStatsHandler {
     if (statisticsFile == null) {
       LOG.info(
           "Using full compute as previous statistics file is not present for 
incremental compute.");
-      stats = computeStats(table, snapshot, file -> true, false /* incremental 
*/).values();
+      stats =
+          computeStats(table, snapshot.allManifests(table.io()), false /* 
incremental */).values();
     } else {
       stats = computeAndMergeStatsIncremental(table, snapshot, partitionType, 
statisticsFile);
     }
@@ -276,7 +275,8 @@ public class PartitionStatsHandler {
     PartitionMap<PartitionStats> statsMap = PartitionMap.create(table.specs());
     // read previous stats, note that partition field will be read as 
GenericRecord
     try (CloseableIterable<PartitionStats> oldStats =
-        readPartitionStatsFile(schema(partitionType), 
Files.localInput(previousStatsFile.path()))) {
+        readPartitionStatsFile(
+            schema(partitionType), 
table.io().newInputFile(previousStatsFile.path()))) {
       oldStats.forEach(
           partitionStats ->
               statsMap.put(partitionStats.specId(), 
partitionStats.partition(), partitionStats));
@@ -325,28 +325,34 @@ public class PartitionStatsHandler {
       }
     }
 
-    // This is unlikely to happen.
-    throw new RuntimeException(
-        "Unable to find previous stats with valid snapshot. Invalidate 
partition stats for all the snapshots to use full compute.");
+    // A stats file exists but isn't accessible from the current snapshot 
chain.
+    // It may belong to a different snapshot reference (like a branch or tag).
+    // Falling back to full computation for current snapshot.
+    return null;
   }
 
   private static PartitionMap<PartitionStats> computeStatsDiff(
       Table table, Snapshot fromSnapshot, Snapshot toSnapshot) {
-    Set<Long> snapshotIdsRange =
-        Sets.newHashSet(
-            SnapshotUtil.ancestorIdsBetween(
-                toSnapshot.snapshotId(), fromSnapshot.snapshotId(), 
table::snapshot));
-    Predicate<ManifestFile> manifestFilePredicate =
-        manifestFile -> snapshotIdsRange.contains(manifestFile.snapshotId());
-    return computeStats(table, toSnapshot, manifestFilePredicate, true /* 
incremental */);
+    Iterable<Snapshot> snapshots =
+        SnapshotUtil.ancestorsBetween(
+            toSnapshot.snapshotId(), fromSnapshot.snapshotId(), 
table::snapshot);
+    // DELETED manifest entries are not carried over to subsequent snapshots.
+    // So, for incremental computation, gather the manifests added by each 
snapshot
+    // instead of relying solely on those from the latest snapshot.
+    List<ManifestFile> manifests =
+        StreamSupport.stream(snapshots.spliterator(), false)
+            .flatMap(
+                snapshot ->
+                    snapshot.allManifests(table.io()).stream()
+                        .filter(file -> 
file.snapshotId().equals(snapshot.snapshotId())))
+            .collect(Collectors.toList());
+
+    return computeStats(table, manifests, true /* incremental */);
   }
 
   private static PartitionMap<PartitionStats> computeStats(
-      Table table, Snapshot snapshot, Predicate<ManifestFile> predicate, 
boolean incremental) {
+      Table table, List<ManifestFile> manifests, boolean incremental) {
     StructType partitionType = Partitioning.partitionType(table);
-    List<ManifestFile> manifests =
-        
snapshot.allManifests(table.io()).stream().filter(predicate).collect(Collectors.toList());
-
     Queue<PartitionMap<PartitionStats>> statsByManifest = 
Queues.newConcurrentLinkedQueue();
     Tasks.foreach(manifests)
         .stopOnFailure()
diff --git 
a/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java 
b/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java
index a00ad76e1d..1c56fae2de 100644
--- a/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java
+++ b/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java
@@ -204,7 +204,7 @@ public abstract class PartitionStatsHandlerTestBase {
     List<PartitionStats> written;
     try (CloseableIterable<PartitionStats> recordIterator =
         PartitionStatsHandler.readPartitionStatsFile(
-            dataSchema, Files.localInput(statisticsFile.path()))) {
+            dataSchema, testTable.io().newInputFile(statisticsFile.path()))) {
       written = Lists.newArrayList(recordIterator);
     }
 
@@ -273,7 +273,7 @@ public abstract class PartitionStatsHandlerTestBase {
     List<PartitionStats> written;
     try (CloseableIterable<PartitionStats> recordIterator =
         PartitionStatsHandler.readPartitionStatsFile(
-            dataSchema, Files.localInput(statisticsFile.path()))) {
+            dataSchema, testTable.io().newInputFile(statisticsFile.path()))) {
       written = Lists.newArrayList(recordIterator);
     }
 
@@ -441,6 +441,52 @@ public abstract class PartitionStatsHandlerTestBase {
             snapshot1.snapshotId()));
   }
 
+  @Test
+  public void testCopyOnWriteDelete() throws Exception {
+    Table testTable =
+        TestTables.create(tempDir("my_test"), "my_test", SCHEMA, SPEC, 2, 
fileFormatProperty);
+
+    DataFile dataFile1 =
+        DataFiles.builder(SPEC)
+            .withPath("/df1.parquet")
+            .withPartitionPath("c2=a/c3=a")
+            .withFileSizeInBytes(10)
+            .withRecordCount(1)
+            .build();
+    DataFile dataFile2 =
+        DataFiles.builder(SPEC)
+            .withPath("/df2.parquet")
+            .withPartitionPath("c2=b/c3=b")
+            .withFileSizeInBytes(10)
+            .withRecordCount(1)
+            .build();
+
+    testTable.newAppend().appendFile(dataFile1).appendFile(dataFile2).commit();
+
+    PartitionStatisticsFile statisticsFile =
+        PartitionStatsHandler.computeAndWriteStatsFile(testTable);
+    
testTable.updatePartitionStatistics().setPartitionStatistics(statisticsFile).commit();
+
+    assertThat(
+            PartitionStatsHandler.readPartitionStatsFile(
+                
PartitionStatsHandler.schema(Partitioning.partitionType(testTable)),
+                testTable.io().newInputFile(statisticsFile.path())))
+        .allMatch(s -> (s.dataRecordCount() != 0 && s.dataFileCount() != 0));
+
+    testTable.newDelete().deleteFile(dataFile1).commit();
+    testTable.newDelete().deleteFile(dataFile2).commit();
+
+    PartitionStatisticsFile statisticsFileNew =
+        PartitionStatsHandler.computeAndWriteStatsFile(testTable);
+
+    // stats must be decremented to zero as all the files removed from table.
+    assertThat(
+            PartitionStatsHandler.readPartitionStatsFile(
+                
PartitionStatsHandler.schema(Partitioning.partitionType(testTable)),
+                testTable.io().newInputFile(statisticsFileNew.path())))
+        .allMatch(s -> (s.dataRecordCount() == 0 && s.dataFileCount() == 0));
+  }
+
   @Test
   public void testLatestStatsFile() throws Exception {
     Table testTable =
@@ -476,6 +522,46 @@ public abstract class PartitionStatsHandlerTestBase {
     assertThat(latestStatsFile).isEqualTo(statisticsFile);
   }
 
+  @Test
+  public void testLatestStatsFileWithBranch() throws Exception {
+    Table testTable =
+        TestTables.create(
+            tempDir("stats_file_branch"), "stats_file_branch", SCHEMA, SPEC, 
2, fileFormatProperty);
+    DataFile dataFile =
+        FileGenerationUtil.generateDataFile(testTable, 
TestHelpers.Row.of("foo", "A"));
+
+    /*
+                                             * [statsMainB]
+          ---- snapshotA  ------ snapshotMainB
+                        \
+                         \
+                          \
+                           snapshotBranchB(branch:b1)
+    */
+
+    testTable.newAppend().appendFile(dataFile).commit();
+    long snapshotAId = testTable.currentSnapshot().snapshotId();
+
+    testTable.newAppend().appendFile(dataFile).commit();
+    long snapshotMainBId = testTable.currentSnapshot().snapshotId();
+
+    String branchName = "b1";
+    testTable.manageSnapshots().createBranch(branchName, snapshotAId).commit();
+    testTable.newAppend().appendFile(dataFile).commit();
+    long snapshotBranchBId = testTable.snapshot(branchName).snapshotId();
+
+    PartitionStatisticsFile statsMainB =
+        PartitionStatsHandler.computeAndWriteStatsFile(testTable, 
snapshotMainBId);
+    
testTable.updatePartitionStatistics().setPartitionStatistics(statsMainB).commit();
+
+    // should find latest stats for snapshotMainB
+    assertThat(PartitionStatsHandler.latestStatsFile(testTable, 
snapshotMainBId))
+        .isEqualTo(statsMainB);
+
+    // should not find latest stats for snapshotBranchB
+    assertThat(PartitionStatsHandler.latestStatsFile(testTable, 
snapshotBranchBId)).isNull();
+  }
+
   private static StructLike partitionRecord(
       Types.StructType partitionType, String val1, String val2) {
     GenericRecord record = GenericRecord.create(partitionType);
@@ -496,7 +582,7 @@ public abstract class PartitionStatsHandlerTestBase {
     List<PartitionStats> partitionStats;
     try (CloseableIterable<PartitionStats> recordIterator =
         PartitionStatsHandler.readPartitionStatsFile(
-            recordSchema, Files.localInput(result.path()))) {
+            recordSchema, testTable.io().newInputFile(result.path()))) {
       partitionStats = Lists.newArrayList(recordIterator);
     }
 
diff --git 
a/orc/src/test/java/org/apache/iceberg/TestOrcPartitionStatsHandler.java 
b/orc/src/test/java/org/apache/iceberg/TestOrcPartitionStatsHandler.java
index c26c004297..f91b86b234 100644
--- a/orc/src/test/java/org/apache/iceberg/TestOrcPartitionStatsHandler.java
+++ b/orc/src/test/java/org/apache/iceberg/TestOrcPartitionStatsHandler.java
@@ -53,4 +53,18 @@ public class TestOrcPartitionStatsHandler extends 
PartitionStatsHandlerTestBase
         .isInstanceOf(UnsupportedOperationException.class)
         .hasMessage("Cannot write using unregistered internal data format: 
ORC");
   }
+
+  @Override
+  public void testLatestStatsFileWithBranch() throws Exception {
+    assertThatThrownBy(super::testLatestStatsFileWithBranch)
+        .isInstanceOf(UnsupportedOperationException.class)
+        .hasMessage("Cannot write using unregistered internal data format: 
ORC");
+  }
+
+  @Override
+  public void testCopyOnWriteDelete() throws Exception {
+    assertThatThrownBy(super::testCopyOnWriteDelete)
+        .isInstanceOf(UnsupportedOperationException.class)
+        .hasMessage("Cannot write using unregistered internal data format: 
ORC");
+  }
 }

Reply via email to