This is an automated email from the ASF dual-hosted git repository. emaynard pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/polaris.git
The following commit(s) were added to refs/heads/main by this push: new e27679106 Add cleanup support for partition-level statistics files when `DROP TABLE PURGE` (#1508) e27679106 is described below commit e276791068130e5812ffb0ca539dc41cf0233fca Author: danielhumanmod <danieltu.l...@gmail.com> AuthorDate: Fri May 9 14:45:22 2025 -0700 Add cleanup support for partition-level statistics files when `DROP TABLE PURGE` (#1508) * cleaning up partition stats * update partition stat file extension * update test partition stat write impl --- .../task/BatchFileCleanupTaskHandlerTest.java | 41 ++++++++++++++-------- .../quarkus/task/TableCleanupTaskHandlerTest.java | 22 ++++++++++-- .../service/quarkus/task/TaskTestUtils.java | 40 +++++++++++++++++++-- .../service/task/TableCleanupTaskHandler.java | 15 ++++---- 4 files changed, 93 insertions(+), 25 deletions(-) diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/BatchFileCleanupTaskHandlerTest.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/BatchFileCleanupTaskHandlerTest.java index b55ffbb9e..662f88bb0 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/BatchFileCleanupTaskHandlerTest.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/BatchFileCleanupTaskHandlerTest.java @@ -35,6 +35,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.PartitionStatisticsFile; import org.apache.iceberg.Snapshot; import org.apache.iceberg.StatisticsFile; import org.apache.iceberg.TableMetadata; @@ -124,10 +125,19 @@ public class BatchFileCleanupTaskHandlerTest { snapshot.sequenceNumber(), "/metadata/" + UUID.randomUUID() + ".stats", fileIO); + PartitionStatisticsFile partitionStatisticsFile1 = + TaskTestUtils.writePartitionStatsFile( + snapshot.snapshotId(), + "/metadata/" + "partition-stats-" + UUID.randomUUID() + ".parquet", + fileIO); String firstMetadataFile = "v1-295495059.metadata.json"; TableMetadata firstMetadata = TaskTestUtils.writeTableMetadata( - fileIO, firstMetadataFile, List.of(statisticsFile1), snapshot); + fileIO, + firstMetadataFile, + List.of(statisticsFile1), + List.of(partitionStatisticsFile1), + snapshot); assertThat(TaskUtils.exists(firstMetadataFile, fileIO)).isTrue(); ManifestFile manifestFile3 = @@ -148,6 +158,11 @@ public class BatchFileCleanupTaskHandlerTest { snapshot2.sequenceNumber(), "/metadata/" + UUID.randomUUID() + ".stats", fileIO); + PartitionStatisticsFile partitionStatisticsFile2 = + TaskTestUtils.writePartitionStatsFile( + snapshot2.snapshotId(), + "/metadata/" + "partition-stats-" + UUID.randomUUID() + ".parquet", + fileIO); String secondMetadataFile = "v1-295495060.metadata.json"; TableMetadata secondMetadata = TaskTestUtils.writeTableMetadata( @@ -156,18 +171,19 @@ public class BatchFileCleanupTaskHandlerTest { firstMetadata, firstMetadataFile, List.of(statisticsFile2), + List.of(partitionStatisticsFile2), snapshot2); assertThat(TaskUtils.exists(firstMetadataFile, fileIO)).isTrue(); assertThat(TaskUtils.exists(secondMetadataFile, fileIO)).isTrue(); List<String> cleanupFiles = - Stream.concat( - secondMetadata.previousFiles().stream() - .map(TableMetadata.MetadataLogEntry::file) - .filter(file -> TaskUtils.exists(file, fileIO)), - secondMetadata.statisticsFiles().stream() - .map(StatisticsFile::path) - .filter(file -> TaskUtils.exists(file, fileIO))) + Stream.of( + secondMetadata.previousFiles().stream().map(TableMetadata.MetadataLogEntry::file), + secondMetadata.statisticsFiles().stream().map(StatisticsFile::path), + secondMetadata.partitionStatisticsFiles().stream() + .map(PartitionStatisticsFile::path)) + .flatMap(s -> s) + .filter(file -> TaskUtils.exists(file, fileIO)) .toList(); TaskEntity task = @@ -183,12 +199,9 @@ public class BatchFileCleanupTaskHandlerTest { assertThatPredicate(handler::canHandleTask).accepts(task); assertThat(handler.handleTask(task, callCtx)).isTrue(); - assertThatPredicate((String file) -> TaskUtils.exists(file, fileIO)) - .rejects(firstMetadataFile); - assertThatPredicate((String file) -> TaskUtils.exists(file, fileIO)) - .rejects(statisticsFile1.path()); - assertThatPredicate((String file) -> TaskUtils.exists(file, fileIO)) - .rejects(statisticsFile2.path()); + for (String cleanupFile : cleanupFiles) { + assertThatPredicate((String file) -> TaskUtils.exists(file, fileIO)).rejects(cleanupFile); + } } } diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TableCleanupTaskHandlerTest.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TableCleanupTaskHandlerTest.java index 2bb53b40c..5e39028c9 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TableCleanupTaskHandlerTest.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TableCleanupTaskHandlerTest.java @@ -33,6 +33,7 @@ import java.util.UUID; import org.apache.commons.codec.binary.Base64; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.ManifestFiles; +import org.apache.iceberg.PartitionStatisticsFile; import org.apache.iceberg.Snapshot; import org.apache.iceberg.StatisticsFile; import org.apache.iceberg.TableMetadata; @@ -519,10 +520,19 @@ class TableCleanupTaskHandlerTest { snapshot.sequenceNumber(), "/metadata/" + UUID.randomUUID() + ".stats", fileIO); + PartitionStatisticsFile partitionStatisticsFile1 = + TaskTestUtils.writePartitionStatsFile( + snapshot.snapshotId(), + "/metadata/" + "partition-stats-" + UUID.randomUUID() + ".parquet", + fileIO); String firstMetadataFile = "v1-295495059.metadata.json"; TableMetadata firstMetadata = TaskTestUtils.writeTableMetadata( - fileIO, firstMetadataFile, List.of(statisticsFile1), snapshot); + fileIO, + firstMetadataFile, + List.of(statisticsFile1), + List.of(partitionStatisticsFile1), + snapshot); assertThat(TaskUtils.exists(firstMetadataFile, fileIO)).isTrue(); ManifestFile manifestFile3 = @@ -543,6 +553,11 @@ class TableCleanupTaskHandlerTest { snapshot2.sequenceNumber(), "/metadata/" + UUID.randomUUID() + ".stats", fileIO); + PartitionStatisticsFile partitionStatisticsFile2 = + TaskTestUtils.writePartitionStatsFile( + snapshot2.snapshotId(), + "/metadata/" + "partition-stats-" + UUID.randomUUID() + ".parquet", + fileIO); String secondMetadataFile = "v1-295495060.metadata.json"; TaskTestUtils.writeTableMetadata( fileIO, @@ -550,6 +565,7 @@ class TableCleanupTaskHandlerTest { firstMetadata, firstMetadataFile, List.of(statisticsFile2), + List.of(partitionStatisticsFile2), snapshot2); assertThat(TaskUtils.exists(firstMetadataFile, fileIO)).isTrue(); assertThat(TaskUtils.exists(secondMetadataFile, fileIO)).isTrue(); @@ -609,7 +625,9 @@ class TableCleanupTaskHandlerTest { snapshot.manifestListLocation(), snapshot2.manifestListLocation(), statisticsFile1.path(), - statisticsFile2.path())), + statisticsFile2.path(), + partitionStatisticsFile1.path(), + partitionStatisticsFile2.path())), entity -> entity.readData( BatchFileCleanupTaskHandler.BatchFileCleanupTask.class))); diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TaskTestUtils.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TaskTestUtils.java index 83f32f9b3..a4a00060c 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TaskTestUtils.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TaskTestUtils.java @@ -20,6 +20,7 @@ package org.apache.polaris.service.quarkus.task; import jakarta.annotation.Nonnull; import java.io.IOException; +import java.io.UncheckedIOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Arrays; @@ -30,10 +31,12 @@ import org.apache.iceberg.DataFiles; import org.apache.iceberg.FileFormat; import org.apache.iceberg.GenericBlobMetadata; import org.apache.iceberg.GenericStatisticsFile; +import org.apache.iceberg.ImmutableGenericPartitionStatisticsFile; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.ManifestFiles; import org.apache.iceberg.ManifestWriter; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.PartitionStatisticsFile; import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; import org.apache.iceberg.SortOrder; @@ -71,7 +74,7 @@ public class TaskTestUtils { static TableMetadata writeTableMetadata(FileIO fileIO, String metadataFile, Snapshot... snapshots) throws IOException { - return writeTableMetadata(fileIO, metadataFile, null, null, null, snapshots); + return writeTableMetadata(fileIO, metadataFile, null, null, null, null, snapshots); } static TableMetadata writeTableMetadata( @@ -80,7 +83,18 @@ public class TaskTestUtils { List<StatisticsFile> statisticsFiles, Snapshot... snapshots) throws IOException { - return writeTableMetadata(fileIO, metadataFile, null, null, statisticsFiles, snapshots); + return writeTableMetadata(fileIO, metadataFile, null, null, statisticsFiles, null, snapshots); + } + + static TableMetadata writeTableMetadata( + FileIO fileIO, + String metadataFile, + List<StatisticsFile> statisticsFiles, + List<PartitionStatisticsFile> partitionStatsFiles, + Snapshot... snapshots) + throws IOException { + return writeTableMetadata( + fileIO, metadataFile, null, null, statisticsFiles, partitionStatsFiles, snapshots); } static TableMetadata writeTableMetadata( @@ -89,6 +103,7 @@ public class TaskTestUtils { TableMetadata prevMetadata, String prevMetadataFile, List<StatisticsFile> statisticsFiles, + List<PartitionStatisticsFile> partitionStatsFiles, Snapshot... snapshots) throws IOException { TableMetadata.Builder tmBuilder; @@ -106,11 +121,15 @@ public class TaskTestUtils { .addPartitionSpec(PartitionSpec.unpartitioned()); int statisticsFileIndex = 0; + int partitionStatsFileIndex = 0; for (Snapshot snapshot : snapshots) { tmBuilder.addSnapshot(snapshot); if (statisticsFiles != null) { tmBuilder.setStatistics(statisticsFiles.get(statisticsFileIndex++)); } + if (partitionStatsFiles != null) { + tmBuilder.setPartitionStatistics(partitionStatsFiles.get(partitionStatsFileIndex++)); + } } TableMetadata tableMetadata = tmBuilder.build(); PositionOutputStream out = fileIO.newOutputFile(metadataFile).createOrOverwrite(); @@ -161,4 +180,21 @@ public class TaskTestUtils { puffinWriter.writtenBlobsMetadata().stream().map(GenericBlobMetadata::from).toList()); } } + + public static PartitionStatisticsFile writePartitionStatsFile( + long snapshotId, String statsLocation, FileIO fileIO) throws UncheckedIOException { + PositionOutputStream positionOutputStream; + try { + positionOutputStream = fileIO.newOutputFile(statsLocation).create(); + positionOutputStream.close(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + + return ImmutableGenericPartitionStatisticsFile.builder() + .snapshotId(snapshotId) + .path(statsLocation) + .fileSizeInBytes(42L) + .build(); + } } diff --git a/service/common/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java b/service/common/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java index f9f1c2f35..ff791bf18 100644 --- a/service/common/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java +++ b/service/common/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java @@ -25,6 +25,7 @@ import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.PartitionStatisticsFile; import org.apache.iceberg.Snapshot; import org.apache.iceberg.StatisticsFile; import org.apache.iceberg.TableMetadata; @@ -112,7 +113,6 @@ public class TableCleanupTaskHandler implements TaskHandler { metaStoreManager, polarisCallContext); - // TODO: handle partition statistics files Stream<TaskEntity> metadataFileCleanupTasks = getMetadataTaskStream( cleanupTask, @@ -243,12 +243,13 @@ public class TableCleanupTaskHandler implements TaskHandler { private List<List<String>> getMetadataFileBatches(TableMetadata tableMetadata, int batchSize) { List<List<String>> result = new ArrayList<>(); List<String> metadataFiles = - Stream.concat( - Stream.concat( - tableMetadata.previousFiles().stream() - .map(TableMetadata.MetadataLogEntry::file), - tableMetadata.snapshots().stream().map(Snapshot::manifestListLocation)), - tableMetadata.statisticsFiles().stream().map(StatisticsFile::path)) + Stream.of( + tableMetadata.previousFiles().stream().map(TableMetadata.MetadataLogEntry::file), + tableMetadata.snapshots().stream().map(Snapshot::manifestListLocation), + tableMetadata.statisticsFiles().stream().map(StatisticsFile::path), + tableMetadata.partitionStatisticsFiles().stream() + .map(PartitionStatisticsFile::path)) + .flatMap(s -> s) .toList(); for (int i = 0; i < metadataFiles.size(); i += batchSize) {