This is an automated email from the ASF dual-hosted git repository.

emaynard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git


The following commit(s) were added to refs/heads/main by this push:
     new e27679106 Add cleanup support for partition-level statistics files 
when `DROP TABLE PURGE` (#1508)
e27679106 is described below

commit e276791068130e5812ffb0ca539dc41cf0233fca
Author: danielhumanmod <danieltu.l...@gmail.com>
AuthorDate: Fri May 9 14:45:22 2025 -0700

    Add cleanup support for partition-level statistics files when `DROP TABLE 
PURGE` (#1508)
    
    * cleaning up partition stats
    
    * update partition stat file extension
    
    * update test partition stat write impl
---
 .../task/BatchFileCleanupTaskHandlerTest.java      | 41 ++++++++++++++--------
 .../quarkus/task/TableCleanupTaskHandlerTest.java  | 22 ++++++++++--
 .../service/quarkus/task/TaskTestUtils.java        | 40 +++++++++++++++++++--
 .../service/task/TableCleanupTaskHandler.java      | 15 ++++----
 4 files changed, 93 insertions(+), 25 deletions(-)

diff --git 
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/BatchFileCleanupTaskHandlerTest.java
 
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/BatchFileCleanupTaskHandlerTest.java
index b55ffbb9e..662f88bb0 100644
--- 
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/BatchFileCleanupTaskHandlerTest.java
+++ 
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/BatchFileCleanupTaskHandlerTest.java
@@ -35,6 +35,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Stream;
 import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.PartitionStatisticsFile;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.StatisticsFile;
 import org.apache.iceberg.TableMetadata;
@@ -124,10 +125,19 @@ public class BatchFileCleanupTaskHandlerTest {
               snapshot.sequenceNumber(),
               "/metadata/" + UUID.randomUUID() + ".stats",
               fileIO);
+      PartitionStatisticsFile partitionStatisticsFile1 =
+          TaskTestUtils.writePartitionStatsFile(
+              snapshot.snapshotId(),
+              "/metadata/" + "partition-stats-" + UUID.randomUUID() + 
".parquet",
+              fileIO);
       String firstMetadataFile = "v1-295495059.metadata.json";
       TableMetadata firstMetadata =
           TaskTestUtils.writeTableMetadata(
-              fileIO, firstMetadataFile, List.of(statisticsFile1), snapshot);
+              fileIO,
+              firstMetadataFile,
+              List.of(statisticsFile1),
+              List.of(partitionStatisticsFile1),
+              snapshot);
       assertThat(TaskUtils.exists(firstMetadataFile, fileIO)).isTrue();
 
       ManifestFile manifestFile3 =
@@ -148,6 +158,11 @@ public class BatchFileCleanupTaskHandlerTest {
               snapshot2.sequenceNumber(),
               "/metadata/" + UUID.randomUUID() + ".stats",
               fileIO);
+      PartitionStatisticsFile partitionStatisticsFile2 =
+          TaskTestUtils.writePartitionStatsFile(
+              snapshot2.snapshotId(),
+              "/metadata/" + "partition-stats-" + UUID.randomUUID() + 
".parquet",
+              fileIO);
       String secondMetadataFile = "v1-295495060.metadata.json";
       TableMetadata secondMetadata =
           TaskTestUtils.writeTableMetadata(
@@ -156,18 +171,19 @@ public class BatchFileCleanupTaskHandlerTest {
               firstMetadata,
               firstMetadataFile,
               List.of(statisticsFile2),
+              List.of(partitionStatisticsFile2),
               snapshot2);
       assertThat(TaskUtils.exists(firstMetadataFile, fileIO)).isTrue();
       assertThat(TaskUtils.exists(secondMetadataFile, fileIO)).isTrue();
 
       List<String> cleanupFiles =
-          Stream.concat(
-                  secondMetadata.previousFiles().stream()
-                      .map(TableMetadata.MetadataLogEntry::file)
-                      .filter(file -> TaskUtils.exists(file, fileIO)),
-                  secondMetadata.statisticsFiles().stream()
-                      .map(StatisticsFile::path)
-                      .filter(file -> TaskUtils.exists(file, fileIO)))
+          Stream.of(
+                  
secondMetadata.previousFiles().stream().map(TableMetadata.MetadataLogEntry::file),
+                  
secondMetadata.statisticsFiles().stream().map(StatisticsFile::path),
+                  secondMetadata.partitionStatisticsFiles().stream()
+                      .map(PartitionStatisticsFile::path))
+              .flatMap(s -> s)
+              .filter(file -> TaskUtils.exists(file, fileIO))
               .toList();
 
       TaskEntity task =
@@ -183,12 +199,9 @@ public class BatchFileCleanupTaskHandlerTest {
       assertThatPredicate(handler::canHandleTask).accepts(task);
       assertThat(handler.handleTask(task, callCtx)).isTrue();
 
-      assertThatPredicate((String file) -> TaskUtils.exists(file, fileIO))
-          .rejects(firstMetadataFile);
-      assertThatPredicate((String file) -> TaskUtils.exists(file, fileIO))
-          .rejects(statisticsFile1.path());
-      assertThatPredicate((String file) -> TaskUtils.exists(file, fileIO))
-          .rejects(statisticsFile2.path());
+      for (String cleanupFile : cleanupFiles) {
+        assertThatPredicate((String file) -> TaskUtils.exists(file, 
fileIO)).rejects(cleanupFile);
+      }
     }
   }
 
diff --git 
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TableCleanupTaskHandlerTest.java
 
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TableCleanupTaskHandlerTest.java
index 2bb53b40c..5e39028c9 100644
--- 
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TableCleanupTaskHandlerTest.java
+++ 
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TableCleanupTaskHandlerTest.java
@@ -33,6 +33,7 @@ import java.util.UUID;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.PartitionStatisticsFile;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.StatisticsFile;
 import org.apache.iceberg.TableMetadata;
@@ -519,10 +520,19 @@ class TableCleanupTaskHandlerTest {
             snapshot.sequenceNumber(),
             "/metadata/" + UUID.randomUUID() + ".stats",
             fileIO);
+    PartitionStatisticsFile partitionStatisticsFile1 =
+        TaskTestUtils.writePartitionStatsFile(
+            snapshot.snapshotId(),
+            "/metadata/" + "partition-stats-" + UUID.randomUUID() + ".parquet",
+            fileIO);
     String firstMetadataFile = "v1-295495059.metadata.json";
     TableMetadata firstMetadata =
         TaskTestUtils.writeTableMetadata(
-            fileIO, firstMetadataFile, List.of(statisticsFile1), snapshot);
+            fileIO,
+            firstMetadataFile,
+            List.of(statisticsFile1),
+            List.of(partitionStatisticsFile1),
+            snapshot);
     assertThat(TaskUtils.exists(firstMetadataFile, fileIO)).isTrue();
 
     ManifestFile manifestFile3 =
@@ -543,6 +553,11 @@ class TableCleanupTaskHandlerTest {
             snapshot2.sequenceNumber(),
             "/metadata/" + UUID.randomUUID() + ".stats",
             fileIO);
+    PartitionStatisticsFile partitionStatisticsFile2 =
+        TaskTestUtils.writePartitionStatsFile(
+            snapshot2.snapshotId(),
+            "/metadata/" + "partition-stats-" + UUID.randomUUID() + ".parquet",
+            fileIO);
     String secondMetadataFile = "v1-295495060.metadata.json";
     TaskTestUtils.writeTableMetadata(
         fileIO,
@@ -550,6 +565,7 @@ class TableCleanupTaskHandlerTest {
         firstMetadata,
         firstMetadataFile,
         List.of(statisticsFile2),
+        List.of(partitionStatisticsFile2),
         snapshot2);
     assertThat(TaskUtils.exists(firstMetadataFile, fileIO)).isTrue();
     assertThat(TaskUtils.exists(secondMetadataFile, fileIO)).isTrue();
@@ -609,7 +625,9 @@ class TableCleanupTaskHandlerTest {
                                 snapshot.manifestListLocation(),
                                 snapshot2.manifestListLocation(),
                                 statisticsFile1.path(),
-                                statisticsFile2.path())),
+                                statisticsFile2.path(),
+                                partitionStatisticsFile1.path(),
+                                partitionStatisticsFile2.path())),
                         entity ->
                             entity.readData(
                                 
BatchFileCleanupTaskHandler.BatchFileCleanupTask.class)));
diff --git 
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TaskTestUtils.java
 
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TaskTestUtils.java
index 83f32f9b3..a4a00060c 100644
--- 
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TaskTestUtils.java
+++ 
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TaskTestUtils.java
@@ -20,6 +20,7 @@ package org.apache.polaris.service.quarkus.task;
 
 import jakarta.annotation.Nonnull;
 import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
@@ -30,10 +31,12 @@ import org.apache.iceberg.DataFiles;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.GenericBlobMetadata;
 import org.apache.iceberg.GenericStatisticsFile;
+import org.apache.iceberg.ImmutableGenericPartitionStatisticsFile;
 import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.ManifestFiles;
 import org.apache.iceberg.ManifestWriter;
 import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.PartitionStatisticsFile;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.SortOrder;
@@ -71,7 +74,7 @@ public class TaskTestUtils {
 
   static TableMetadata writeTableMetadata(FileIO fileIO, String metadataFile, 
Snapshot... snapshots)
       throws IOException {
-    return writeTableMetadata(fileIO, metadataFile, null, null, null, 
snapshots);
+    return writeTableMetadata(fileIO, metadataFile, null, null, null, null, 
snapshots);
   }
 
   static TableMetadata writeTableMetadata(
@@ -80,7 +83,18 @@ public class TaskTestUtils {
       List<StatisticsFile> statisticsFiles,
       Snapshot... snapshots)
       throws IOException {
-    return writeTableMetadata(fileIO, metadataFile, null, null, 
statisticsFiles, snapshots);
+    return writeTableMetadata(fileIO, metadataFile, null, null, 
statisticsFiles, null, snapshots);
+  }
+
+  static TableMetadata writeTableMetadata(
+      FileIO fileIO,
+      String metadataFile,
+      List<StatisticsFile> statisticsFiles,
+      List<PartitionStatisticsFile> partitionStatsFiles,
+      Snapshot... snapshots)
+      throws IOException {
+    return writeTableMetadata(
+        fileIO, metadataFile, null, null, statisticsFiles, 
partitionStatsFiles, snapshots);
   }
 
   static TableMetadata writeTableMetadata(
@@ -89,6 +103,7 @@ public class TaskTestUtils {
       TableMetadata prevMetadata,
       String prevMetadataFile,
       List<StatisticsFile> statisticsFiles,
+      List<PartitionStatisticsFile> partitionStatsFiles,
       Snapshot... snapshots)
       throws IOException {
     TableMetadata.Builder tmBuilder;
@@ -106,11 +121,15 @@ public class TaskTestUtils {
         .addPartitionSpec(PartitionSpec.unpartitioned());
 
     int statisticsFileIndex = 0;
+    int partitionStatsFileIndex = 0;
     for (Snapshot snapshot : snapshots) {
       tmBuilder.addSnapshot(snapshot);
       if (statisticsFiles != null) {
         tmBuilder.setStatistics(statisticsFiles.get(statisticsFileIndex++));
       }
+      if (partitionStatsFiles != null) {
+        
tmBuilder.setPartitionStatistics(partitionStatsFiles.get(partitionStatsFileIndex++));
+      }
     }
     TableMetadata tableMetadata = tmBuilder.build();
     PositionOutputStream out = 
fileIO.newOutputFile(metadataFile).createOrOverwrite();
@@ -161,4 +180,21 @@ public class TaskTestUtils {
           
puffinWriter.writtenBlobsMetadata().stream().map(GenericBlobMetadata::from).toList());
     }
   }
+
+  public static PartitionStatisticsFile writePartitionStatsFile(
+      long snapshotId, String statsLocation, FileIO fileIO) throws 
UncheckedIOException {
+    PositionOutputStream positionOutputStream;
+    try {
+      positionOutputStream = fileIO.newOutputFile(statsLocation).create();
+      positionOutputStream.close();
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+
+    return ImmutableGenericPartitionStatisticsFile.builder()
+        .snapshotId(snapshotId)
+        .path(statsLocation)
+        .fileSizeInBytes(42L)
+        .build();
+  }
 }
diff --git 
a/service/common/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java
 
b/service/common/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java
index f9f1c2f35..ff791bf18 100644
--- 
a/service/common/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java
+++ 
b/service/common/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java
@@ -25,6 +25,7 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.PartitionStatisticsFile;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.StatisticsFile;
 import org.apache.iceberg.TableMetadata;
@@ -112,7 +113,6 @@ public class TableCleanupTaskHandler implements TaskHandler 
{
               metaStoreManager,
               polarisCallContext);
 
-      // TODO: handle partition statistics files
       Stream<TaskEntity> metadataFileCleanupTasks =
           getMetadataTaskStream(
               cleanupTask,
@@ -243,12 +243,13 @@ public class TableCleanupTaskHandler implements 
TaskHandler {
   private List<List<String>> getMetadataFileBatches(TableMetadata 
tableMetadata, int batchSize) {
     List<List<String>> result = new ArrayList<>();
     List<String> metadataFiles =
-        Stream.concat(
-                Stream.concat(
-                    tableMetadata.previousFiles().stream()
-                        .map(TableMetadata.MetadataLogEntry::file),
-                    
tableMetadata.snapshots().stream().map(Snapshot::manifestListLocation)),
-                
tableMetadata.statisticsFiles().stream().map(StatisticsFile::path))
+        Stream.of(
+                
tableMetadata.previousFiles().stream().map(TableMetadata.MetadataLogEntry::file),
+                
tableMetadata.snapshots().stream().map(Snapshot::manifestListLocation),
+                
tableMetadata.statisticsFiles().stream().map(StatisticsFile::path),
+                tableMetadata.partitionStatisticsFiles().stream()
+                    .map(PartitionStatisticsFile::path))
+            .flatMap(s -> s)
             .toList();
 
     for (int i = 0; i < metadataFiles.size(); i += batchSize) {

Reply via email to