This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 7f81e1e930 Core: Use scan API to read partition stats (#14989)
7f81e1e930 is described below
commit 7f81e1e93084e50fa3676c2e131722f66a26b385
Author: gaborkaszab <[email protected]>
AuthorDate: Mon Jan 12 14:14:03 2026 +0100
Core: Use scan API to read partition stats (#14989)
---
.../org/apache/iceberg/PartitionStatistics.java | 15 ++
.../apache/iceberg/BasePartitionStatistics.java | 70 +++---
.../org/apache/iceberg/PartitionStatsHandler.java | 252 +++++++++++++++++----
.../iceberg/PartitionStatisticsTestBase.java | 26 +--
.../iceberg/PartitionStatsHandlerTestBase.java | 67 ++++--
5 files changed, 325 insertions(+), 105 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/PartitionStatistics.java
b/api/src/main/java/org/apache/iceberg/PartitionStatistics.java
index 10df7303d5..c0c4c07b7e 100644
--- a/api/src/main/java/org/apache/iceberg/PartitionStatistics.java
+++ b/api/src/main/java/org/apache/iceberg/PartitionStatistics.java
@@ -21,6 +21,21 @@ package org.apache.iceberg;
/** Interface for partition statistics returned from a {@link
PartitionStatisticsScan}. */
public interface PartitionStatistics extends StructLike {
+ /* The positions of each statistics within the full schema of partition
statistics. */
+ int PARTITION_POSITION = 0;
+ int SPEC_ID_POSITION = 1;
+ int DATA_RECORD_COUNT_POSITION = 2;
+ int DATA_FILE_COUNT_POSITION = 3;
+ int TOTAL_DATA_FILE_SIZE_IN_BYTES_POSITION = 4;
+ int POSITION_DELETE_RECORD_COUNT_POSITION = 5;
+ int POSITION_DELETE_FILE_COUNT_POSITION = 6;
+ int EQUALITY_DELETE_RECORD_COUNT_POSITION = 7;
+ int EQUALITY_DELETE_FILE_COUNT_POSITION = 8;
+ int TOTAL_RECORD_COUNT_POSITION = 9;
+ int LAST_UPDATED_AT_POSITION = 10;
+ int LAST_UPDATED_SNAPSHOT_ID_POSITION = 11;
+ int DV_COUNT_POSITION = 12;
+
/** Returns the partition of these partition statistics */
StructLike partition();
diff --git a/core/src/main/java/org/apache/iceberg/BasePartitionStatistics.java
b/core/src/main/java/org/apache/iceberg/BasePartitionStatistics.java
index c17718281b..4b1a3a6dba 100644
--- a/core/src/main/java/org/apache/iceberg/BasePartitionStatistics.java
+++ b/core/src/main/java/org/apache/iceberg/BasePartitionStatistics.java
@@ -40,8 +40,24 @@ public class BasePartitionStatistics extends
SupportsIndexProjection
private static final int STATS_COUNT = 13;
+ BasePartitionStatistics(StructLike partition, int specId) {
+ super(STATS_COUNT);
+
+ this.partition = partition;
+ this.specId = specId;
+
+ this.dataRecordCount = 0L;
+ this.dataFileCount = 0;
+ this.totalDataFileSizeInBytes = 0L;
+ this.positionDeleteRecordCount = 0L;
+ this.positionDeleteFileCount = 0;
+ this.equalityDeleteRecordCount = 0L;
+ this.equalityDeleteFileCount = 0;
+ this.dvCount = 0;
+ }
+
/** Used by internal readers to instantiate this class with a projection
schema. */
- public BasePartitionStatistics(Types.StructType projection) {
+ BasePartitionStatistics(Types.StructType projection) {
super(STATS_COUNT);
}
@@ -117,31 +133,31 @@ public class BasePartitionStatistics extends
SupportsIndexProjection
private Object getByPos(int pos) {
switch (pos) {
- case 0:
+ case PARTITION_POSITION:
return partition;
- case 1:
+ case SPEC_ID_POSITION:
return specId;
- case 2:
+ case DATA_RECORD_COUNT_POSITION:
return dataRecordCount;
- case 3:
+ case DATA_FILE_COUNT_POSITION:
return dataFileCount;
- case 4:
+ case TOTAL_DATA_FILE_SIZE_IN_BYTES_POSITION:
return totalDataFileSizeInBytes;
- case 5:
+ case POSITION_DELETE_RECORD_COUNT_POSITION:
return positionDeleteRecordCount;
- case 6:
+ case POSITION_DELETE_FILE_COUNT_POSITION:
return positionDeleteFileCount;
- case 7:
+ case EQUALITY_DELETE_RECORD_COUNT_POSITION:
return equalityDeleteRecordCount;
- case 8:
+ case EQUALITY_DELETE_FILE_COUNT_POSITION:
return equalityDeleteFileCount;
- case 9:
+ case TOTAL_RECORD_COUNT_POSITION:
return totalRecordCount;
- case 10:
+ case LAST_UPDATED_AT_POSITION:
return lastUpdatedAt;
- case 11:
+ case LAST_UPDATED_SNAPSHOT_ID_POSITION:
return lastUpdatedSnapshotId;
- case 12:
+ case DV_COUNT_POSITION:
return dvCount;
default:
throw new UnsupportedOperationException("Unknown position: " + pos);
@@ -155,43 +171,43 @@ public class BasePartitionStatistics extends
SupportsIndexProjection
}
switch (pos) {
- case 0:
+ case PARTITION_POSITION:
this.partition = (StructLike) value;
break;
- case 1:
+ case SPEC_ID_POSITION:
this.specId = (int) value;
break;
- case 2:
+ case DATA_RECORD_COUNT_POSITION:
this.dataRecordCount = (long) value;
break;
- case 3:
+ case DATA_FILE_COUNT_POSITION:
this.dataFileCount = (int) value;
break;
- case 4:
+ case TOTAL_DATA_FILE_SIZE_IN_BYTES_POSITION:
this.totalDataFileSizeInBytes = (long) value;
break;
- case 5:
+ case POSITION_DELETE_RECORD_COUNT_POSITION:
this.positionDeleteRecordCount = (long) value;
break;
- case 6:
+ case POSITION_DELETE_FILE_COUNT_POSITION:
this.positionDeleteFileCount = (int) value;
break;
- case 7:
+ case EQUALITY_DELETE_RECORD_COUNT_POSITION:
this.equalityDeleteRecordCount = (long) value;
break;
- case 8:
+ case EQUALITY_DELETE_FILE_COUNT_POSITION:
this.equalityDeleteFileCount = (int) value;
break;
- case 9:
+ case TOTAL_RECORD_COUNT_POSITION:
this.totalRecordCount = (Long) value;
break;
- case 10:
+ case LAST_UPDATED_AT_POSITION:
this.lastUpdatedAt = (Long) value;
break;
- case 11:
+ case LAST_UPDATED_SNAPSHOT_ID_POSITION:
this.lastUpdatedSnapshotId = (Long) value;
break;
- case 12:
+ case DV_COUNT_POSITION:
this.dvCount = (int) value;
break;
default:
diff --git a/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java
b/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java
index 7259a1f068..acf4ca3c0c 100644
--- a/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java
+++ b/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java
@@ -208,9 +208,7 @@ public class PartitionStatsHandler {
Snapshot snapshot = table.snapshot(snapshotId);
Preconditions.checkArgument(snapshot != null, "Snapshot not found: %s",
snapshotId);
- StructType partitionType = Partitioning.partitionType(table);
-
- Collection<PartitionStats> stats;
+ Collection<PartitionStatistics> stats;
PartitionStatisticsFile statisticsFile = latestStatsFile(table,
snapshot.snapshotId());
if (statisticsFile == null) {
LOG.info(
@@ -225,7 +223,7 @@ public class PartitionStatsHandler {
}
try {
- stats = computeAndMergeStatsIncremental(table, snapshot,
partitionType, statisticsFile);
+ stats = computeAndMergeStatsIncremental(table, snapshot,
statisticsFile.snapshotId());
} catch (InvalidStatsFileException exception) {
LOG.warn(
"Using full compute as previous statistics file is corrupted for
incremental compute.");
@@ -240,7 +238,8 @@ public class PartitionStatsHandler {
return null;
}
- List<PartitionStats> sortedStats = sortStatsByPartition(stats,
partitionType);
+ StructType partitionType = Partitioning.partitionType(table);
+ List<PartitionStatistics> sortedStats = sortStatsByPartition(stats,
partitionType);
return writePartitionStatsFile(
table,
snapshot.snapshotId(),
@@ -250,7 +249,7 @@ public class PartitionStatsHandler {
@VisibleForTesting
static PartitionStatisticsFile writePartitionStatsFile(
- Table table, long snapshotId, Schema dataSchema,
Iterable<PartitionStats> records)
+ Table table, long snapshotId, Schema dataSchema,
Iterable<PartitionStatistics> records)
throws IOException {
FileFormat fileFormat =
FileFormat.fromString(
@@ -322,17 +321,11 @@ public class PartitionStatsHandler {
return stats;
}
- private static Collection<PartitionStats> computeAndMergeStatsIncremental(
- Table table,
- Snapshot snapshot,
- StructType partitionType,
- PartitionStatisticsFile previousStatsFile) {
- PartitionMap<PartitionStats> statsMap = PartitionMap.create(table.specs());
- // read previous stats, note that partition field will be read as
GenericRecord
- try (CloseableIterable<PartitionStats> oldStats =
- readPartitionStatsFile(
- schema(partitionType, TableUtil.formatVersion(table)),
- table.io().newInputFile(previousStatsFile.path()))) {
+ private static Collection<PartitionStatistics>
computeAndMergeStatsIncremental(
+ Table table, Snapshot snapshot, long lastSnapshotWithStats) {
+ PartitionMap<PartitionStatistics> statsMap =
PartitionMap.create(table.specs());
+ try (CloseableIterable<PartitionStatistics> oldStats =
+
table.newPartitionStatisticsScan().useSnapshot(lastSnapshotWithStats).scan()) {
oldStats.forEach(
partitionStats ->
statsMap.put(partitionStats.specId(),
partitionStats.partition(), partitionStats));
@@ -341,8 +334,8 @@ public class PartitionStatsHandler {
}
// incrementally compute the new stats, partition field will be written as
PartitionData
- PartitionMap<PartitionStats> incrementalStatsMap =
- computeStatsDiff(table,
table.snapshot(previousStatsFile.snapshotId()), snapshot);
+ PartitionMap<PartitionStatistics> incrementalStatsMap =
+ computeStatsDiff(table, table.snapshot(lastSnapshotWithStats),
snapshot);
// convert PartitionData into GenericRecord and merge stats
incrementalStatsMap.forEach(
@@ -351,7 +344,7 @@ public class PartitionStatsHandler {
Pair.of(key.first(), partitionDataToRecord((PartitionData)
key.second())),
value,
(existingEntry, newEntry) -> {
- existingEntry.appendStats(newEntry);
+ appendStats(existingEntry, newEntry);
return existingEntry;
}));
@@ -389,7 +382,7 @@ public class PartitionStatsHandler {
return null;
}
- private static PartitionMap<PartitionStats> computeStatsDiff(
+ private static PartitionMap<PartitionStatistics> computeStatsDiff(
Table table, Snapshot fromSnapshot, Snapshot toSnapshot) {
Iterable<Snapshot> snapshots =
SnapshotUtil.ancestorsBetween(
@@ -408,10 +401,10 @@ public class PartitionStatsHandler {
return computeStats(table, manifests, true /* incremental */);
}
- private static PartitionMap<PartitionStats> computeStats(
+ private static PartitionMap<PartitionStatistics> computeStats(
Table table, List<ManifestFile> manifests, boolean incremental) {
StructType partitionType = Partitioning.partitionType(table);
- Queue<PartitionMap<PartitionStats>> statsByManifest =
Queues.newConcurrentLinkedQueue();
+ Queue<PartitionMap<PartitionStatistics>> statsByManifest =
Queues.newConcurrentLinkedQueue();
Tasks.foreach(manifests)
.stopOnFailure()
.throwFailureWhenFinished()
@@ -421,19 +414,19 @@ public class PartitionStatsHandler {
statsByManifest.add(
collectStatsForManifest(table, manifest, partitionType,
incremental)));
- PartitionMap<PartitionStats> statsMap = PartitionMap.create(table.specs());
- for (PartitionMap<PartitionStats> stats : statsByManifest) {
+ PartitionMap<PartitionStatistics> statsMap =
PartitionMap.create(table.specs());
+ for (PartitionMap<PartitionStatistics> stats : statsByManifest) {
mergePartitionMap(stats, statsMap);
}
return statsMap;
}
- private static PartitionMap<PartitionStats> collectStatsForManifest(
+ private static PartitionMap<PartitionStatistics> collectStatsForManifest(
Table table, ManifestFile manifest, StructType partitionType, boolean
incremental) {
List<String> projection = BaseScan.scanColumns(manifest.content());
try (ManifestReader<?> reader = ManifestFiles.open(manifest,
table.io()).select(projection)) {
- PartitionMap<PartitionStats> statsMap =
PartitionMap.create(table.specs());
+ PartitionMap<PartitionStatistics> statsMap =
PartitionMap.create(table.specs());
int specId = manifest.partitionSpecId();
PartitionSpec spec = table.specs().get(specId);
PartitionData keyTemplate = new PartitionData(partitionType);
@@ -444,22 +437,22 @@ public class PartitionStatsHandler {
PartitionUtil.coercePartition(partitionType, spec,
file.partition());
StructLike key = keyTemplate.copyFor(coercedPartition);
Snapshot snapshot = table.snapshot(entry.snapshotId());
- PartitionStats stats =
+ PartitionStatistics stats =
statsMap.computeIfAbsent(
specId,
((PartitionData) file.partition()).copy(),
- () -> new PartitionStats(key, specId));
+ () -> new BasePartitionStatistics(key, specId));
if (entry.isLive()) {
// Live can have both added and existing entries. Consider only
added entries for
// incremental compute as existing entries was already included in
previous compute.
if (!incremental || entry.status() == ManifestEntry.Status.ADDED) {
- stats.liveEntry(file, snapshot);
+ liveEntry(stats, file, snapshot);
}
} else {
if (incremental) {
- stats.deletedEntryForIncrementalCompute(file, snapshot);
+ deletedEntryForIncrementalCompute(stats, file, snapshot);
} else {
- stats.deletedEntry(snapshot);
+ deletedEntry(stats, snapshot);
}
}
}
@@ -471,26 +464,209 @@ public class PartitionStatsHandler {
}
private static void mergePartitionMap(
- PartitionMap<PartitionStats> fromMap, PartitionMap<PartitionStats>
toMap) {
+ PartitionMap<PartitionStatistics> fromMap,
PartitionMap<PartitionStatistics> toMap) {
fromMap.forEach(
(key, value) ->
toMap.merge(
key,
value,
(existingEntry, newEntry) -> {
- existingEntry.appendStats(newEntry);
+ appendStats(existingEntry, newEntry);
return existingEntry;
}));
}
- private static List<PartitionStats> sortStatsByPartition(
- Collection<PartitionStats> stats, StructType partitionType) {
- List<PartitionStats> entries = Lists.newArrayList(stats);
+ private static List<PartitionStatistics> sortStatsByPartition(
+ Collection<PartitionStatistics> stats, StructType partitionType) {
+ List<PartitionStatistics> entries = Lists.newArrayList(stats);
entries.sort(
- Comparator.comparing(PartitionStats::partition,
Comparators.forType(partitionType)));
+ Comparator.comparing(PartitionStatistics::partition,
Comparators.forType(partitionType)));
return entries;
}
+ /**
+ * Updates the partition stats from the data/delete file.
+ *
+ * @param stats partition statistics to be updated.
+ * @param file the {@link ContentFile} from the manifest entry.
+ * @param snapshot the snapshot corresponding to the live entry.
+ */
+ private static void liveEntry(PartitionStatistics stats, ContentFile<?>
file, Snapshot snapshot) {
+ Preconditions.checkArgument(stats.specId() == file.specId(), "Spec IDs
must match");
+
+ switch (file.content()) {
+ case DATA:
+ stats.set(
+ PartitionStatistics.DATA_RECORD_COUNT_POSITION,
+ stats.dataRecordCount() + file.recordCount());
+ stats.set(PartitionStatistics.DATA_FILE_COUNT_POSITION,
stats.dataFileCount() + 1);
+ stats.set(
+ PartitionStatistics.TOTAL_DATA_FILE_SIZE_IN_BYTES_POSITION,
+ stats.totalDataFileSizeInBytes() + file.fileSizeInBytes());
+ break;
+ case POSITION_DELETES:
+ stats.set(
+ PartitionStatistics.POSITION_DELETE_RECORD_COUNT_POSITION,
+ stats.positionDeleteRecordCount() + file.recordCount());
+ if (file.format() == FileFormat.PUFFIN) {
+ stats.set(PartitionStatistics.DV_COUNT_POSITION, stats.dvCount() +
1);
+ } else {
+ stats.set(
+ PartitionStatistics.POSITION_DELETE_FILE_COUNT_POSITION,
+ stats.positionDeleteFileCount() + 1);
+ }
+
+ break;
+ case EQUALITY_DELETES:
+ stats.set(
+ PartitionStatistics.EQUALITY_DELETE_RECORD_COUNT_POSITION,
+ stats.equalityDeleteRecordCount() + file.recordCount());
+ stats.set(
+ PartitionStatistics.EQUALITY_DELETE_FILE_COUNT_POSITION,
+ stats.equalityDeleteFileCount() + 1);
+ break;
+ default:
+ throw new UnsupportedOperationException("Unsupported file content
type: " + file.content());
+ }
+
+ if (snapshot != null) {
+ updateSnapshotInfo(stats, snapshot.snapshotId(),
snapshot.timestampMillis());
+ }
+
+ // Note: Not computing the `TOTAL_RECORD_COUNT` for now as it needs
scanning the data.
+ }
+
+ /**
+ * Updates the modified time and snapshot ID in stats for the deleted
manifest entry.
+ *
+ * @param stats partition statistics to be updated.
+ * @param snapshot the snapshot corresponding to the deleted manifest entry.
+ */
+ private static void deletedEntry(PartitionStatistics stats, Snapshot
snapshot) {
+ if (snapshot != null) {
+ updateSnapshotInfo(stats, snapshot.snapshotId(),
snapshot.timestampMillis());
+ }
+ }
+
+ /**
+ * Decrement the counters in stats as it was included in the previous stats
and updates the
+ * modified time and snapshot ID for the deleted manifest entry.
+ *
+ * @param stats partition statistics to be updated.
+ * @param snapshot the snapshot corresponding to the deleted manifest entry.
+ */
+ private static void deletedEntryForIncrementalCompute(
+ PartitionStatistics stats, ContentFile<?> file, Snapshot snapshot) {
+ Preconditions.checkArgument(stats.specId() == file.specId(), "Spec IDs
must match");
+
+ switch (file.content()) {
+ case DATA:
+ stats.set(
+ PartitionStatistics.DATA_RECORD_COUNT_POSITION,
+ stats.dataRecordCount() - file.recordCount());
+ stats.set(PartitionStatistics.DATA_FILE_COUNT_POSITION,
stats.dataFileCount() - 1);
+ stats.set(
+ PartitionStatistics.TOTAL_DATA_FILE_SIZE_IN_BYTES_POSITION,
+ stats.totalDataFileSizeInBytes() - file.fileSizeInBytes());
+ break;
+ case POSITION_DELETES:
+ stats.set(
+ PartitionStatistics.POSITION_DELETE_RECORD_COUNT_POSITION,
+ stats.positionDeleteRecordCount() - file.recordCount());
+ if (file.format() == FileFormat.PUFFIN) {
+ stats.set(PartitionStatistics.DV_COUNT_POSITION, stats.dvCount() -
1);
+ } else {
+ stats.set(
+ PartitionStatistics.POSITION_DELETE_FILE_COUNT_POSITION,
+ stats.positionDeleteFileCount() - 1);
+ }
+
+ break;
+ case EQUALITY_DELETES:
+ stats.set(
+ PartitionStatistics.EQUALITY_DELETE_RECORD_COUNT_POSITION,
+ stats.equalityDeleteRecordCount() - file.recordCount());
+ stats.set(
+ PartitionStatistics.EQUALITY_DELETE_FILE_COUNT_POSITION,
+ stats.equalityDeleteFileCount() - 1);
+ break;
+ default:
+ throw new UnsupportedOperationException("Unsupported file content
type: " + file.content());
+ }
+
+ if (snapshot != null) {
+ updateSnapshotInfo(stats, snapshot.snapshotId(),
snapshot.timestampMillis());
+ }
+ }
+
+ /**
+ * Appends statistics from given entry to another entry.
+ *
+ * @param targetStats partition statistics to be updated.
+ * @param inputStats the partition statistics used as input.
+ */
+ private static void appendStats(PartitionStatistics targetStats,
PartitionStatistics inputStats) {
+ Preconditions.checkArgument(targetStats.specId() != null, "Invalid spec
ID: null");
+ Preconditions.checkArgument(
+ targetStats.specId().equals(inputStats.specId()), "Spec IDs must
match");
+
+ // This is expected to be called on the compute/write path where we use
full schemas, hence
+ // these members can't be null.
+ targetStats.set(
+ PartitionStatistics.DATA_RECORD_COUNT_POSITION,
+ targetStats.dataRecordCount() + inputStats.dataRecordCount());
+ targetStats.set(
+ PartitionStatistics.DATA_FILE_COUNT_POSITION,
+ targetStats.dataFileCount() + inputStats.dataFileCount());
+ targetStats.set(
+ PartitionStatistics.TOTAL_DATA_FILE_SIZE_IN_BYTES_POSITION,
+ targetStats.totalDataFileSizeInBytes() +
inputStats.totalDataFileSizeInBytes());
+ targetStats.set(
+ PartitionStatistics.POSITION_DELETE_RECORD_COUNT_POSITION,
+ targetStats.positionDeleteRecordCount() +
inputStats.positionDeleteRecordCount());
+ targetStats.set(
+ PartitionStatistics.POSITION_DELETE_FILE_COUNT_POSITION,
+ targetStats.positionDeleteFileCount() +
inputStats.positionDeleteFileCount());
+ targetStats.set(
+ PartitionStatistics.EQUALITY_DELETE_RECORD_COUNT_POSITION,
+ targetStats.equalityDeleteRecordCount() +
inputStats.equalityDeleteRecordCount());
+ targetStats.set(
+ PartitionStatistics.EQUALITY_DELETE_FILE_COUNT_POSITION,
+ targetStats.equalityDeleteFileCount() +
inputStats.equalityDeleteFileCount());
+
+ if (inputStats.dvCount() != null) {
+ if (targetStats.dvCount() == null) {
+ targetStats.set(PartitionStatistics.DV_COUNT_POSITION,
inputStats.dvCount());
+ } else {
+ targetStats.set(
+ PartitionStatistics.DV_COUNT_POSITION, targetStats.dvCount() +
inputStats.dvCount());
+ }
+ }
+
+ if (inputStats.totalRecords() != null) {
+ if (targetStats.totalRecords() == null) {
+ targetStats.set(PartitionStatistics.TOTAL_RECORD_COUNT_POSITION,
inputStats.totalRecords());
+ } else {
+ targetStats.set(
+ PartitionStatistics.TOTAL_RECORD_COUNT_POSITION,
+ targetStats.totalRecords() + inputStats.totalRecords());
+ }
+ }
+
+ if (inputStats.lastUpdatedAt() != null) {
+ updateSnapshotInfo(
+ targetStats, inputStats.lastUpdatedSnapshotId(),
inputStats.lastUpdatedAt());
+ }
+ }
+
+ private static void updateSnapshotInfo(
+ PartitionStatistics stats, long snapshotId, long updatedAt) {
+ if (stats.lastUpdatedAt() == null || stats.lastUpdatedAt() < updatedAt) {
+ stats.set(PartitionStatistics.LAST_UPDATED_AT_POSITION, updatedAt);
+ stats.set(PartitionStatistics.LAST_UPDATED_SNAPSHOT_ID_POSITION,
snapshotId);
+ }
+ }
+
private static class InvalidStatsFileException extends RuntimeException {
InvalidStatsFileException(Throwable cause) {
diff --git
a/core/src/test/java/org/apache/iceberg/PartitionStatisticsTestBase.java
b/core/src/test/java/org/apache/iceberg/PartitionStatisticsTestBase.java
index 72a5405d7f..f6326e228c 100644
--- a/core/src/test/java/org/apache/iceberg/PartitionStatisticsTestBase.java
+++ b/core/src/test/java/org/apache/iceberg/PartitionStatisticsTestBase.java
@@ -44,19 +44,6 @@ public abstract class PartitionStatisticsTestBase {
@TempDir private File temp;
- // positions in StructLike
- protected static final int DATA_RECORD_COUNT_POSITION = 2;
- protected static final int DATA_FILE_COUNT_POSITION = 3;
- protected static final int TOTAL_DATA_FILE_SIZE_IN_BYTES_POSITION = 4;
- protected static final int POSITION_DELETE_RECORD_COUNT_POSITION = 5;
- protected static final int POSITION_DELETE_FILE_COUNT_POSITION = 6;
- protected static final int EQUALITY_DELETE_RECORD_COUNT_POSITION = 7;
- protected static final int EQUALITY_DELETE_FILE_COUNT_POSITION = 8;
- protected static final int TOTAL_RECORD_COUNT_POSITION = 9;
- protected static final int LAST_UPDATED_AT_POSITION = 10;
- protected static final int LAST_UPDATED_SNAPSHOT_ID_POSITION = 11;
- protected static final int DV_COUNT_POSITION = 12;
-
protected static final Schema SCHEMA =
new Schema(
optional(1, "c1", Types.IntegerType.get()),
@@ -85,18 +72,19 @@ public abstract class PartitionStatisticsTestBase {
Types.NestedField.optional(11, LAST_UPDATED_SNAPSHOT_ID.name(),
Types.LongType.get()));
}
- protected PartitionStats randomStats(Types.StructType partitionType) {
+ protected PartitionStatistics randomStats(Types.StructType partitionType) {
PartitionData partitionData = new PartitionData(partitionType);
partitionData.set(0, RANDOM.nextInt());
return randomStats(partitionData);
}
- protected PartitionStats randomStats(PartitionData partitionData) {
- PartitionStats stats = new PartitionStats(partitionData,
RANDOM.nextInt(10));
- stats.set(DATA_RECORD_COUNT_POSITION, RANDOM.nextLong());
- stats.set(DATA_FILE_COUNT_POSITION, RANDOM.nextInt());
- stats.set(TOTAL_DATA_FILE_SIZE_IN_BYTES_POSITION, 1024L *
RANDOM.nextInt(20));
+ protected PartitionStatistics randomStats(PartitionData partitionData) {
+ PartitionStatistics stats = new BasePartitionStatistics(partitionData,
RANDOM.nextInt(10));
+ stats.set(PartitionStatistics.DATA_RECORD_COUNT_POSITION,
RANDOM.nextLong());
+ stats.set(PartitionStatistics.DATA_FILE_COUNT_POSITION, RANDOM.nextInt());
+ stats.set(
+ PartitionStatistics.TOTAL_DATA_FILE_SIZE_IN_BYTES_POSITION, 1024L *
RANDOM.nextInt(20));
return stats;
}
diff --git
a/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java
b/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java
index 9b93013a9b..fd1e5ffe29 100644
--- a/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java
+++ b/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java
@@ -180,8 +180,8 @@ public abstract class PartitionStatsHandlerTestBase extends
PartitionStatisticsT
partitionData.set(13, new BigDecimal("12345678901234567890.1234567890"));
partitionData.set(14,
Literal.of("10:10:10").to(Types.TimeType.get()).value());
- PartitionStats partitionStats = randomStats(partitionData);
- List<PartitionStats> expected = Collections.singletonList(partitionStats);
+ PartitionStatistics partitionStats = randomStats(partitionData);
+ List<PartitionStatistics> expected =
Collections.singletonList(partitionStats);
PartitionStatisticsFile statisticsFile =
PartitionStatsHandler.writePartitionStatsFile(testTable, 42L,
dataSchema, expected);
@@ -214,34 +214,34 @@ public abstract class PartitionStatsHandlerTestBase
extends PartitionStatisticsT
Types.StructType partitionSchema = Partitioning.partitionType(testTable);
Schema dataSchema = PartitionStatsHandler.schema(partitionSchema,
formatVersion);
- ImmutableList.Builder<PartitionStats> partitionListBuilder =
ImmutableList.builder();
+ ImmutableList.Builder<PartitionStatistics> partitionListBuilder =
ImmutableList.builder();
for (int i = 0; i < 5; i++) {
- PartitionStats stats =
+ PartitionStatistics stats =
randomStats(dataSchema.findField(PARTITION_FIELD_ID).type().asStructType());
- stats.set(POSITION_DELETE_RECORD_COUNT_POSITION, null);
- stats.set(POSITION_DELETE_FILE_COUNT_POSITION, null);
- stats.set(EQUALITY_DELETE_RECORD_COUNT_POSITION, null);
- stats.set(EQUALITY_DELETE_FILE_COUNT_POSITION, null);
- stats.set(TOTAL_RECORD_COUNT_POSITION, null);
- stats.set(LAST_UPDATED_AT_POSITION, null);
- stats.set(LAST_UPDATED_SNAPSHOT_ID_POSITION, null);
- stats.set(DV_COUNT_POSITION, null);
+ stats.set(PartitionStatistics.POSITION_DELETE_RECORD_COUNT_POSITION,
null);
+ stats.set(PartitionStatistics.POSITION_DELETE_FILE_COUNT_POSITION, null);
+ stats.set(PartitionStatistics.EQUALITY_DELETE_RECORD_COUNT_POSITION,
null);
+ stats.set(PartitionStatistics.EQUALITY_DELETE_FILE_COUNT_POSITION, null);
+ stats.set(PartitionStatistics.TOTAL_RECORD_COUNT_POSITION, null);
+ stats.set(PartitionStatistics.LAST_UPDATED_AT_POSITION, null);
+ stats.set(PartitionStatistics.LAST_UPDATED_SNAPSHOT_ID_POSITION, null);
+ stats.set(PartitionStatistics.DV_COUNT_POSITION, null);
partitionListBuilder.add(stats);
}
- List<PartitionStats> expected = partitionListBuilder.build();
+ List<PartitionStatistics> expected = partitionListBuilder.build();
assertThat(expected.get(0))
.extracting(
- PartitionStats::positionDeleteRecordCount,
- PartitionStats::positionDeleteFileCount,
- PartitionStats::equalityDeleteRecordCount,
- PartitionStats::equalityDeleteFileCount,
- PartitionStats::totalRecords,
- PartitionStats::lastUpdatedAt,
- PartitionStats::lastUpdatedSnapshotId,
- PartitionStats::dvCount)
+ PartitionStatistics::positionDeleteRecordCount,
+ PartitionStatistics::positionDeleteFileCount,
+ PartitionStatistics::equalityDeleteRecordCount,
+ PartitionStatistics::equalityDeleteFileCount,
+ PartitionStatistics::totalRecords,
+ PartitionStatistics::lastUpdatedAt,
+ PartitionStatistics::lastUpdatedSnapshotId,
+ PartitionStatistics::dvCount)
.isEqualTo(
Arrays.asList(
0L, 0, 0L, 0, null, null, null, 0)); // null counters must be
initialized to zero.
@@ -734,4 +734,29 @@ public abstract class PartitionStatsHandlerTestBase
extends PartitionStatisticsT
&& Objects.equals(stats1.lastUpdatedAt(), stats2.lastUpdatedAt())
&& Objects.equals(stats1.lastUpdatedSnapshotId(),
stats2.lastUpdatedSnapshotId());
}
+
+ @SuppressWarnings("checkstyle:CyclomaticComplexity")
+ private static boolean isEqual(
+ Comparator<StructLike> partitionComparator,
+ PartitionStats stats1,
+ PartitionStatistics stats2) {
+ if (stats1 == stats2) {
+ return true;
+ } else if (stats1 == null || stats2 == null) {
+ return false;
+ }
+
+ return partitionComparator.compare(stats1.partition(), stats2.partition())
== 0
+ && stats1.specId() == stats2.specId()
+ && stats1.dataRecordCount() == stats2.dataRecordCount()
+ && stats1.dataFileCount() == stats2.dataFileCount()
+ && stats1.totalDataFileSizeInBytes() ==
stats2.totalDataFileSizeInBytes()
+ && stats1.positionDeleteRecordCount() ==
stats2.positionDeleteRecordCount()
+ && stats1.positionDeleteFileCount() == stats2.positionDeleteFileCount()
+ && stats1.equalityDeleteRecordCount() ==
stats2.equalityDeleteRecordCount()
+ && stats1.equalityDeleteFileCount() == stats2.equalityDeleteFileCount()
+ && Objects.equals(stats1.totalRecords(), stats2.totalRecords())
+ && Objects.equals(stats1.lastUpdatedAt(), stats2.lastUpdatedAt())
+ && Objects.equals(stats1.lastUpdatedSnapshotId(),
stats2.lastUpdatedSnapshotId());
+ }
}