This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 2ada622509 Core: Fix incremental compute of partition stats for
various edge cases (#13163)
2ada622509 is described below
commit 2ada622509ad1e57c9df70016875c8bbf119cb95
Author: Ajantha Bhat <[email protected]>
AuthorDate: Thu May 29 16:50:49 2025 +0530
Core: Fix incremental compute of partition stats for various edge cases
(#13163)
---
.../org/apache/iceberg/PartitionStatsHandler.java | 44 ++++++-----
.../iceberg/PartitionStatsHandlerTestBase.java | 92 +++++++++++++++++++++-
.../iceberg/TestOrcPartitionStatsHandler.java | 14 ++++
3 files changed, 128 insertions(+), 22 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java
b/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java
index aeb6b8b4c7..3fb2e790a2 100644
--- a/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java
+++ b/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java
@@ -29,9 +29,9 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Queue;
-import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileAppender;
@@ -39,10 +39,8 @@ import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.base.Predicate;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Queues;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Comparators;
import org.apache.iceberg.types.Types.IntegerType;
import org.apache.iceberg.types.Types.LongType;
@@ -163,7 +161,8 @@ public class PartitionStatsHandler {
if (statisticsFile == null) {
LOG.info(
"Using full compute as previous statistics file is not present for
incremental compute.");
- stats = computeStats(table, snapshot, file -> true, false /* incremental
*/).values();
+ stats =
+ computeStats(table, snapshot.allManifests(table.io()), false /*
incremental */).values();
} else {
stats = computeAndMergeStatsIncremental(table, snapshot, partitionType,
statisticsFile);
}
@@ -276,7 +275,8 @@ public class PartitionStatsHandler {
PartitionMap<PartitionStats> statsMap = PartitionMap.create(table.specs());
// read previous stats, note that partition field will be read as
GenericRecord
try (CloseableIterable<PartitionStats> oldStats =
- readPartitionStatsFile(schema(partitionType),
Files.localInput(previousStatsFile.path()))) {
+ readPartitionStatsFile(
+ schema(partitionType),
table.io().newInputFile(previousStatsFile.path()))) {
oldStats.forEach(
partitionStats ->
statsMap.put(partitionStats.specId(),
partitionStats.partition(), partitionStats));
@@ -325,28 +325,34 @@ public class PartitionStatsHandler {
}
}
- // This is unlikely to happen.
- throw new RuntimeException(
- "Unable to find previous stats with valid snapshot. Invalidate
partition stats for all the snapshots to use full compute.");
+ // A stats file exists but isn't accessible from the current snapshot
chain.
+ // It may belong to a different snapshot reference (like a branch or tag).
+ // Falling back to full computation for current snapshot.
+ return null;
}
private static PartitionMap<PartitionStats> computeStatsDiff(
Table table, Snapshot fromSnapshot, Snapshot toSnapshot) {
- Set<Long> snapshotIdsRange =
- Sets.newHashSet(
- SnapshotUtil.ancestorIdsBetween(
- toSnapshot.snapshotId(), fromSnapshot.snapshotId(),
table::snapshot));
- Predicate<ManifestFile> manifestFilePredicate =
- manifestFile -> snapshotIdsRange.contains(manifestFile.snapshotId());
- return computeStats(table, toSnapshot, manifestFilePredicate, true /*
incremental */);
+ Iterable<Snapshot> snapshots =
+ SnapshotUtil.ancestorsBetween(
+ toSnapshot.snapshotId(), fromSnapshot.snapshotId(),
table::snapshot);
+ // DELETED manifest entries are not carried over to subsequent snapshots.
+ // So, for incremental computation, gather the manifests added by each
snapshot
+ // instead of relying solely on those from the latest snapshot.
+ List<ManifestFile> manifests =
+ StreamSupport.stream(snapshots.spliterator(), false)
+ .flatMap(
+ snapshot ->
+ snapshot.allManifests(table.io()).stream()
+ .filter(file ->
file.snapshotId().equals(snapshot.snapshotId())))
+ .collect(Collectors.toList());
+
+ return computeStats(table, manifests, true /* incremental */);
}
private static PartitionMap<PartitionStats> computeStats(
- Table table, Snapshot snapshot, Predicate<ManifestFile> predicate,
boolean incremental) {
+ Table table, List<ManifestFile> manifests, boolean incremental) {
StructType partitionType = Partitioning.partitionType(table);
- List<ManifestFile> manifests =
-
snapshot.allManifests(table.io()).stream().filter(predicate).collect(Collectors.toList());
-
Queue<PartitionMap<PartitionStats>> statsByManifest =
Queues.newConcurrentLinkedQueue();
Tasks.foreach(manifests)
.stopOnFailure()
diff --git
a/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java
b/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java
index a00ad76e1d..1c56fae2de 100644
--- a/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java
+++ b/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java
@@ -204,7 +204,7 @@ public abstract class PartitionStatsHandlerTestBase {
List<PartitionStats> written;
try (CloseableIterable<PartitionStats> recordIterator =
PartitionStatsHandler.readPartitionStatsFile(
- dataSchema, Files.localInput(statisticsFile.path()))) {
+ dataSchema, testTable.io().newInputFile(statisticsFile.path()))) {
written = Lists.newArrayList(recordIterator);
}
@@ -273,7 +273,7 @@ public abstract class PartitionStatsHandlerTestBase {
List<PartitionStats> written;
try (CloseableIterable<PartitionStats> recordIterator =
PartitionStatsHandler.readPartitionStatsFile(
- dataSchema, Files.localInput(statisticsFile.path()))) {
+ dataSchema, testTable.io().newInputFile(statisticsFile.path()))) {
written = Lists.newArrayList(recordIterator);
}
@@ -441,6 +441,52 @@ public abstract class PartitionStatsHandlerTestBase {
snapshot1.snapshotId()));
}
+ @Test
+ public void testCopyOnWriteDelete() throws Exception {
+ Table testTable =
+ TestTables.create(tempDir("my_test"), "my_test", SCHEMA, SPEC, 2,
fileFormatProperty);
+
+ DataFile dataFile1 =
+ DataFiles.builder(SPEC)
+ .withPath("/df1.parquet")
+ .withPartitionPath("c2=a/c3=a")
+ .withFileSizeInBytes(10)
+ .withRecordCount(1)
+ .build();
+ DataFile dataFile2 =
+ DataFiles.builder(SPEC)
+ .withPath("/df2.parquet")
+ .withPartitionPath("c2=b/c3=b")
+ .withFileSizeInBytes(10)
+ .withRecordCount(1)
+ .build();
+
+ testTable.newAppend().appendFile(dataFile1).appendFile(dataFile2).commit();
+
+ PartitionStatisticsFile statisticsFile =
+ PartitionStatsHandler.computeAndWriteStatsFile(testTable);
+
testTable.updatePartitionStatistics().setPartitionStatistics(statisticsFile).commit();
+
+ assertThat(
+ PartitionStatsHandler.readPartitionStatsFile(
+
PartitionStatsHandler.schema(Partitioning.partitionType(testTable)),
+ testTable.io().newInputFile(statisticsFile.path())))
+ .allMatch(s -> (s.dataRecordCount() != 0 && s.dataFileCount() != 0));
+
+ testTable.newDelete().deleteFile(dataFile1).commit();
+ testTable.newDelete().deleteFile(dataFile2).commit();
+
+ PartitionStatisticsFile statisticsFileNew =
+ PartitionStatsHandler.computeAndWriteStatsFile(testTable);
+
+ // stats must be decremented to zero as all the files removed from table.
+ assertThat(
+ PartitionStatsHandler.readPartitionStatsFile(
+
PartitionStatsHandler.schema(Partitioning.partitionType(testTable)),
+ testTable.io().newInputFile(statisticsFileNew.path())))
+ .allMatch(s -> (s.dataRecordCount() == 0 && s.dataFileCount() == 0));
+ }
+
@Test
public void testLatestStatsFile() throws Exception {
Table testTable =
@@ -476,6 +522,46 @@ public abstract class PartitionStatsHandlerTestBase {
assertThat(latestStatsFile).isEqualTo(statisticsFile);
}
+ @Test
+ public void testLatestStatsFileWithBranch() throws Exception {
+ Table testTable =
+ TestTables.create(
+ tempDir("stats_file_branch"), "stats_file_branch", SCHEMA, SPEC,
2, fileFormatProperty);
+ DataFile dataFile =
+ FileGenerationUtil.generateDataFile(testTable,
TestHelpers.Row.of("foo", "A"));
+
+ /*
+ * [statsMainB]
+ ---- snapshotA ------ snapshotMainB
+ \
+ \
+ \
+ snapshotBranchB(branch:b1)
+ */
+
+ testTable.newAppend().appendFile(dataFile).commit();
+ long snapshotAId = testTable.currentSnapshot().snapshotId();
+
+ testTable.newAppend().appendFile(dataFile).commit();
+ long snapshotMainBId = testTable.currentSnapshot().snapshotId();
+
+ String branchName = "b1";
+ testTable.manageSnapshots().createBranch(branchName, snapshotAId).commit();
+ testTable.newAppend().appendFile(dataFile).commit();
+ long snapshotBranchBId = testTable.snapshot(branchName).snapshotId();
+
+ PartitionStatisticsFile statsMainB =
+ PartitionStatsHandler.computeAndWriteStatsFile(testTable,
snapshotMainBId);
+
testTable.updatePartitionStatistics().setPartitionStatistics(statsMainB).commit();
+
+ // should find latest stats for snapshotMainB
+ assertThat(PartitionStatsHandler.latestStatsFile(testTable,
snapshotMainBId))
+ .isEqualTo(statsMainB);
+
+ // should not find latest stats for snapshotBranchB
+ assertThat(PartitionStatsHandler.latestStatsFile(testTable,
snapshotBranchBId)).isNull();
+ }
+
private static StructLike partitionRecord(
Types.StructType partitionType, String val1, String val2) {
GenericRecord record = GenericRecord.create(partitionType);
@@ -496,7 +582,7 @@ public abstract class PartitionStatsHandlerTestBase {
List<PartitionStats> partitionStats;
try (CloseableIterable<PartitionStats> recordIterator =
PartitionStatsHandler.readPartitionStatsFile(
- recordSchema, Files.localInput(result.path()))) {
+ recordSchema, testTable.io().newInputFile(result.path()))) {
partitionStats = Lists.newArrayList(recordIterator);
}
diff --git
a/orc/src/test/java/org/apache/iceberg/TestOrcPartitionStatsHandler.java
b/orc/src/test/java/org/apache/iceberg/TestOrcPartitionStatsHandler.java
index c26c004297..f91b86b234 100644
--- a/orc/src/test/java/org/apache/iceberg/TestOrcPartitionStatsHandler.java
+++ b/orc/src/test/java/org/apache/iceberg/TestOrcPartitionStatsHandler.java
@@ -53,4 +53,18 @@ public class TestOrcPartitionStatsHandler extends
PartitionStatsHandlerTestBase
.isInstanceOf(UnsupportedOperationException.class)
.hasMessage("Cannot write using unregistered internal data format:
ORC");
}
+
+ @Override
+ public void testLatestStatsFileWithBranch() throws Exception {
+ assertThatThrownBy(super::testLatestStatsFileWithBranch)
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessage("Cannot write using unregistered internal data format:
ORC");
+ }
+
+ @Override
+ public void testCopyOnWriteDelete() throws Exception {
+ assertThatThrownBy(super::testCopyOnWriteDelete)
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessage("Cannot write using unregistered internal data format:
ORC");
+ }
}