This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 5921421cfe Spark-3.3: Handle statistics files while expiring snapshots 
(#6091)
5921421cfe is described below

commit 5921421cfe70346284778fa2d4ebd594ba470205
Author: Ajantha Bhat <[email protected]>
AuthorDate: Sat Mar 11 00:33:36 2023 +0530

    Spark-3.3: Handle statistics files while expiring snapshots (#6091)
---
 .../apache/iceberg/actions/ExpireSnapshots.java    |   9 ++
 .../java/org/apache/iceberg/ReachableFileUtil.java |  22 +++-
 .../actions/BaseExpireSnapshotsActionResult.java   |   2 +
 .../extensions/TestExpireSnapshotsProcedure.java   | 144 +++++++++++++++++----
 .../iceberg/spark/actions/BaseSparkAction.java     |  28 ++++
 .../spark/actions/ExpireSnapshotsSparkAction.java  |  19 +--
 .../spark/procedures/ExpireSnapshotsProcedure.java |   7 +-
 7 files changed, 193 insertions(+), 38 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/actions/ExpireSnapshots.java 
b/api/src/main/java/org/apache/iceberg/actions/ExpireSnapshots.java
index 0e02f4bec9..286ab115d1 100644
--- a/api/src/main/java/org/apache/iceberg/actions/ExpireSnapshots.java
+++ b/api/src/main/java/org/apache/iceberg/actions/ExpireSnapshots.java
@@ -22,6 +22,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.function.Consumer;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.io.SupportsBulkOperations;
+import org.immutables.value.Value;
 
 /**
  * An action that expires snapshots in a table.
@@ -29,6 +30,7 @@ import org.apache.iceberg.io.SupportsBulkOperations;
  * <p>Similar to {@link org.apache.iceberg.ExpireSnapshots} but may use a 
query engine to distribute
  * parts of the work.
  */
[email protected]
 public interface ExpireSnapshots extends Action<ExpireSnapshots, 
ExpireSnapshots.Result> {
   /**
    * Expires a specific {@link Snapshot} identified by id.
@@ -98,6 +100,7 @@ public interface ExpireSnapshots extends 
Action<ExpireSnapshots, ExpireSnapshots
   ExpireSnapshots executeDeleteWith(ExecutorService executorService);
 
   /** The action result that contains a summary of the execution. */
+  @Value.Immutable
   interface Result {
     /** Returns the number of deleted data files. */
     long deletedDataFilesCount();
@@ -113,5 +116,11 @@ public interface ExpireSnapshots extends 
Action<ExpireSnapshots, ExpireSnapshots
 
     /** Returns the number of deleted manifest lists. */
     long deletedManifestListsCount();
+
+    /** Returns the number of deleted statistics files. */
+    @Value.Default
+    default long deletedStatisticsFilesCount() {
+      return 0L;
+    }
   }
 }
diff --git a/core/src/main/java/org/apache/iceberg/ReachableFileUtil.java 
b/core/src/main/java/org/apache/iceberg/ReachableFileUtil.java
index 731eb943b5..60a1f5290a 100644
--- a/core/src/main/java/org/apache/iceberg/ReachableFileUtil.java
+++ b/core/src/main/java/org/apache/iceberg/ReachableFileUtil.java
@@ -20,6 +20,8 @@ package org.apache.iceberg;
 
 import java.util.List;
 import java.util.Set;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
 import org.apache.hadoop.fs.Path;
 import org.apache.iceberg.TableMetadata.MetadataLogEntry;
 import org.apache.iceberg.hadoop.Util;
@@ -137,11 +139,21 @@ public class ReachableFileUtil {
    * @return the location of statistics files
    */
   public static List<String> statisticsFilesLocations(Table table) {
-    List<String> statisticsFilesLocations = Lists.newArrayList();
-    for (StatisticsFile statisticsFile : table.statisticsFiles()) {
-      statisticsFilesLocations.add(statisticsFile.path());
-    }
+    return statisticsFilesLocations(table, statisticsFile -> true);
+  }
 
-    return statisticsFilesLocations;
+  /**
+   * Returns locations of statistics files for a table matching the given 
predicate .
+   *
+   * @param table table for which statistics files needs to be listed
+   * @param predicate predicate for filtering the statistics files
+   * @return the location of statistics files
+   */
+  public static List<String> statisticsFilesLocations(
+      Table table, Predicate<StatisticsFile> predicate) {
+    return table.statisticsFiles().stream()
+        .filter(predicate)
+        .map(StatisticsFile::path)
+        .collect(Collectors.toList());
   }
 }
diff --git 
a/core/src/main/java/org/apache/iceberg/actions/BaseExpireSnapshotsActionResult.java
 
b/core/src/main/java/org/apache/iceberg/actions/BaseExpireSnapshotsActionResult.java
index 93fd8431b3..d750bcdbc9 100644
--- 
a/core/src/main/java/org/apache/iceberg/actions/BaseExpireSnapshotsActionResult.java
+++ 
b/core/src/main/java/org/apache/iceberg/actions/BaseExpireSnapshotsActionResult.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iceberg.actions;
 
+/** @deprecated will be removed in 1.3.0. */
+@Deprecated
 public class BaseExpireSnapshotsActionResult implements ExpireSnapshots.Result 
{
 
   private final long deletedDataFilesCount;
diff --git 
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
 
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
index e7f648ed6f..efb3d43668 100644
--- 
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
+++ 
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
@@ -20,28 +20,40 @@ package org.apache.iceberg.spark.extensions;
 
 import static org.apache.iceberg.TableProperties.GC_ENABLED;
 
+import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 import java.sql.Timestamp;
 import java.time.Instant;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.GenericBlobMetadata;
+import org.apache.iceberg.GenericStatisticsFile;
 import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.StatisticsFile;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.puffin.Blob;
+import org.apache.iceberg.puffin.Puffin;
+import org.apache.iceberg.puffin.PuffinWriter;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.spark.Spark3Util;
 import org.apache.iceberg.spark.SparkCatalog;
 import org.apache.iceberg.spark.data.TestHelpers;
 import org.apache.iceberg.spark.source.SimpleRecord;
 import org.apache.spark.sql.AnalysisException;
 import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
+import org.assertj.core.api.Assertions;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
@@ -63,7 +75,8 @@ public class TestExpireSnapshotsProcedure extends 
SparkExtensionsTestBase {
     sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", 
tableName);
 
     List<Object[]> output = sql("CALL %s.system.expire_snapshots('%s')", 
catalogName, tableIdent);
-    assertEquals("Should not delete any files", ImmutableList.of(row(0L, 0L, 
0L, 0L, 0L)), output);
+    assertEquals(
+        "Should not delete any files", ImmutableList.of(row(0L, 0L, 0L, 0L, 
0L, 0L)), output);
   }
 
   @Test
@@ -91,7 +104,8 @@ public class TestExpireSnapshotsProcedure extends 
SparkExtensionsTestBase {
         sql(
             "CALL %s.system.expire_snapshots('%s', TIMESTAMP '%s')",
             catalogName, tableIdent, secondSnapshotTimestamp);
-    assertEquals("Procedure output must match", ImmutableList.of(row(0L, 0L, 
0L, 0L, 1L)), output1);
+    assertEquals(
+        "Procedure output must match", ImmutableList.of(row(0L, 0L, 0L, 0L, 
1L, 0L)), output1);
 
     table.refresh();
 
@@ -117,7 +131,8 @@ public class TestExpireSnapshotsProcedure extends 
SparkExtensionsTestBase {
         sql(
             "CALL %s.system.expire_snapshots('%s', TIMESTAMP '%s', 2)",
             catalogName, tableIdent, currentTimestamp);
-    assertEquals("Procedure output must match", ImmutableList.of(row(2L, 0L, 
0L, 2L, 1L)), output);
+    assertEquals(
+        "Procedure output must match", ImmutableList.of(row(2L, 0L, 0L, 2L, 
1L, 0L)), output);
   }
 
   @Test
@@ -137,12 +152,10 @@ public class TestExpireSnapshotsProcedure extends 
SparkExtensionsTestBase {
 
     List<Object[]> output =
         sql(
-            "CALL %s.system.expire_snapshots("
-                + "older_than => TIMESTAMP '%s',"
-                + "table => '%s',"
-                + "retain_last => 1)",
+            "CALL %s.system.expire_snapshots(older_than => TIMESTAMP 
'%s',table => '%s')",
             catalogName, currentTimestamp, tableIdent);
-    assertEquals("Procedure output must match", ImmutableList.of(row(0L, 0L, 
0L, 0L, 1L)), output);
+    assertEquals(
+        "Procedure output must match", ImmutableList.of(row(0L, 0L, 0L, 0L, 
1L, 0L)), output);
   }
 
   @Test
@@ -231,12 +244,11 @@ public class TestExpireSnapshotsProcedure extends 
SparkExtensionsTestBase {
             "CALL %s.system.expire_snapshots("
                 + "older_than => TIMESTAMP '%s',"
                 + "table => '%s',"
-                + "max_concurrent_deletes => %s,"
-                + "retain_last => 1)",
+                + "max_concurrent_deletes => %s)",
             catalogName, currentTimestamp, tableIdent, 4);
     assertEquals(
         "Expiring snapshots concurrently should succeed",
-        ImmutableList.of(row(0L, 0L, 0L, 0L, 3L)),
+        ImmutableList.of(row(0L, 0L, 0L, 0L, 3L, 0L)),
         output);
   }
 
@@ -283,7 +295,7 @@ public class TestExpireSnapshotsProcedure extends 
SparkExtensionsTestBase {
         .append();
     sql("DELETE FROM %s WHERE id=1", tableName);
 
-    Table table = Spark3Util.loadIcebergTable(spark, tableName);
+    Table table = validationCatalog.loadTable(tableIdent);
 
     Assert.assertEquals(
         "Should have 1 delete manifest", 1, 
TestHelpers.deleteManifests(table).size());
@@ -318,15 +330,12 @@ public class TestExpireSnapshotsProcedure extends 
SparkExtensionsTestBase {
     Timestamp currentTimestamp = 
Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis()));
     List<Object[]> output =
         sql(
-            "CALL %s.system.expire_snapshots("
-                + "older_than => TIMESTAMP '%s',"
-                + "table => '%s',"
-                + "retain_last => 1)",
+            "CALL %s.system.expire_snapshots(older_than => TIMESTAMP 
'%s',table => '%s')",
             catalogName, currentTimestamp, tableIdent);
 
     assertEquals(
         "Should deleted 1 data and pos delete file and 4 manifests and lists 
(one for each txn)",
-        ImmutableList.of(row(1L, 1L, 0L, 4L, 4L)),
+        ImmutableList.of(row(1L, 1L, 0L, 4L, 4L, 0L)),
         output);
     Assert.assertFalse("Delete manifest should be removed", 
localFs.exists(deleteManifestPath));
     Assert.assertFalse("Delete file should be removed", 
localFs.exists(deleteFilePath));
@@ -352,10 +361,10 @@ public class TestExpireSnapshotsProcedure extends 
SparkExtensionsTestBase {
             "CALL %s.system.expire_snapshots("
                 + "older_than => TIMESTAMP '%s',"
                 + "table => '%s',"
-                + "retain_last => 1, "
                 + "stream_results => true)",
             catalogName, currentTimestamp, tableIdent);
-    assertEquals("Procedure output must match", ImmutableList.of(row(0L, 0L, 
0L, 0L, 1L)), output);
+    assertEquals(
+        "Procedure output must match", ImmutableList.of(row(0L, 0L, 0L, 0L, 
1L, 0L)), output);
   }
 
   @Test
@@ -434,13 +443,102 @@ public class TestExpireSnapshotsProcedure extends 
SparkExtensionsTestBase {
             + "/* And comments that span *multiple* \n"
             + " lines */ CALL /* this is the actual CALL */ 
%s.system.expire_snapshots("
             + "   older_than => TIMESTAMP '%s',"
-            + "   table => '%s',"
-            + "   retain_last => 1)";
+            + "   table => '%s')";
     List<Object[]> output = sql(callStatement, catalogName, currentTimestamp, 
tableIdent);
-    assertEquals("Procedure output must match", ImmutableList.of(row(0L, 0L, 
0L, 0L, 1L)), output);
+    assertEquals(
+        "Procedure output must match", ImmutableList.of(row(0L, 0L, 0L, 0L, 
1L, 0L)), output);
 
     table.refresh();
 
     Assert.assertEquals("Should be 1 snapshot remaining", 1, 
Iterables.size(table.snapshots()));
   }
+
+  @Test
+  public void testExpireSnapshotsWithStatisticFiles() throws Exception {
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", 
tableName);
+    sql("INSERT INTO TABLE %s VALUES (10, 'abc')", tableName);
+    Table table = validationCatalog.loadTable(tableIdent);
+    String statsFileLocation1 = statsFileLocation(table.location());
+    StatisticsFile statisticsFile1 =
+        writeStatsFile(
+            table.currentSnapshot().snapshotId(),
+            table.currentSnapshot().sequenceNumber(),
+            statsFileLocation1,
+            table.io());
+    table.updateStatistics().setStatistics(statisticsFile1.snapshotId(), 
statisticsFile1).commit();
+
+    sql("INSERT INTO %s SELECT 20, 'def'", tableName);
+    table.refresh();
+    String statsFileLocation2 = statsFileLocation(table.location());
+    StatisticsFile statisticsFile2 =
+        writeStatsFile(
+            table.currentSnapshot().snapshotId(),
+            table.currentSnapshot().sequenceNumber(),
+            statsFileLocation2,
+            table.io());
+    table.updateStatistics().setStatistics(statisticsFile2.snapshotId(), 
statisticsFile2).commit();
+
+    waitUntilAfter(table.currentSnapshot().timestampMillis());
+
+    Timestamp currentTimestamp = 
Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis()));
+    List<Object[]> output =
+        sql(
+            "CALL %s.system.expire_snapshots(older_than => TIMESTAMP 
'%s',table => '%s')",
+            catalogName, currentTimestamp, tableIdent);
+    Assertions.assertThat(output.get(0)[5]).as("should be 1 deleted statistics 
file").isEqualTo(1L);
+
+    table.refresh();
+    List<StatisticsFile> statsWithSnapshotId1 =
+        table.statisticsFiles().stream()
+            .filter(statisticsFile -> statisticsFile.snapshotId() == 
statisticsFile1.snapshotId())
+            .collect(Collectors.toList());
+    Assertions.assertThat(statsWithSnapshotId1)
+        .as(
+            "Statistics file entry in TableMetadata should be deleted for the 
snapshot %s",
+            statisticsFile1.snapshotId())
+        .isEmpty();
+    Assertions.assertThat(table.statisticsFiles())
+        .as(
+            "Statistics file entry in TableMetadata should be present for the 
snapshot %s",
+            statisticsFile2.snapshotId())
+        .extracting(StatisticsFile::snapshotId)
+        .containsExactly(statisticsFile2.snapshotId());
+
+    Assertions.assertThat(new File(statsFileLocation1))
+        .as("Statistics file should not exist for snapshot %s", 
statisticsFile1.snapshotId())
+        .doesNotExist();
+
+    Assertions.assertThat(new File(statsFileLocation2))
+        .as("Statistics file should exist for snapshot %s", 
statisticsFile2.snapshotId())
+        .exists();
+  }
+
+  private StatisticsFile writeStatsFile(
+      long snapshotId, long snapshotSequenceNumber, String statsLocation, 
FileIO fileIO)
+      throws IOException {
+    try (PuffinWriter puffinWriter = 
Puffin.write(fileIO.newOutputFile(statsLocation)).build()) {
+      puffinWriter.add(
+          new Blob(
+              "some-blob-type",
+              ImmutableList.of(1),
+              snapshotId,
+              snapshotSequenceNumber,
+              ByteBuffer.wrap("blob 
content".getBytes(StandardCharsets.UTF_8))));
+      puffinWriter.finish();
+
+      return new GenericStatisticsFile(
+          snapshotId,
+          statsLocation,
+          puffinWriter.fileSize(),
+          puffinWriter.footerSize(),
+          puffinWriter.writtenBlobsMetadata().stream()
+              .map(GenericBlobMetadata::from)
+              .collect(ImmutableList.toImmutableList()));
+    }
+  }
+
+  private String statsFileLocation(String tableLocation) {
+    String statsFileName = "stats-file-" + UUID.randomUUID();
+    return tableLocation.replaceFirst("file:", "") + "/metadata/" + 
statsFileName;
+  }
 }
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
index 1b285e8cac..3c007c6214 100644
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
@@ -31,6 +31,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
+import java.util.function.Predicate;
 import java.util.function.Supplier;
 import org.apache.iceberg.AllManifestsTable;
 import org.apache.iceberg.BaseTable;
@@ -43,6 +44,7 @@ import org.apache.iceberg.MetadataTableType;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.ReachableFileUtil;
 import org.apache.iceberg.StaticTableOperations;
+import org.apache.iceberg.StatisticsFile;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.exceptions.NotFoundException;
@@ -80,6 +82,7 @@ abstract class BaseSparkAction<ThisT> {
 
   protected static final String MANIFEST = "Manifest";
   protected static final String MANIFEST_LIST = "Manifest List";
+  protected static final String STATISTICS_FILES = "Statistics Files";
   protected static final String OTHERS = "Others";
 
   protected static final String FILE_PATH = "file_path";
@@ -200,6 +203,18 @@ abstract class BaseSparkAction<ThisT> {
     return toFileInfoDS(manifestLists, MANIFEST_LIST);
   }
 
+  protected Dataset<FileInfo> statisticsFileDS(Table table, Set<Long> 
snapshotIds) {
+    Predicate<StatisticsFile> predicate;
+    if (snapshotIds == null) {
+      predicate = statisticsFile -> true;
+    } else {
+      predicate = statisticsFile -> 
snapshotIds.contains(statisticsFile.snapshotId());
+    }
+
+    List<String> statisticsFiles = 
ReachableFileUtil.statisticsFilesLocations(table, predicate);
+    return toFileInfoDS(statisticsFiles, STATISTICS_FILES);
+  }
+
   protected Dataset<FileInfo> otherMetadataFileDS(Table table) {
     return otherMetadataFileDS(table, false /* include all reachable old 
metadata locations */);
   }
@@ -297,6 +312,7 @@ abstract class BaseSparkAction<ThisT> {
     private final AtomicLong equalityDeleteFilesCount = new AtomicLong(0L);
     private final AtomicLong manifestsCount = new AtomicLong(0L);
     private final AtomicLong manifestListsCount = new AtomicLong(0L);
+    private final AtomicLong statisticsFilesCount = new AtomicLong(0L);
     private final AtomicLong otherFilesCount = new AtomicLong(0L);
 
     public void deletedFiles(String type, int numFiles) {
@@ -315,6 +331,9 @@ abstract class BaseSparkAction<ThisT> {
       } else if (MANIFEST_LIST.equalsIgnoreCase(type)) {
         manifestListsCount.addAndGet(numFiles);
 
+      } else if (STATISTICS_FILES.equalsIgnoreCase(type)) {
+        statisticsFilesCount.addAndGet(numFiles);
+
       } else if (OTHERS.equalsIgnoreCase(type)) {
         otherFilesCount.addAndGet(numFiles);
 
@@ -344,6 +363,10 @@ abstract class BaseSparkAction<ThisT> {
         manifestListsCount.incrementAndGet();
         LOG.debug("Deleted manifest list: {}", path);
 
+      } else if (STATISTICS_FILES.equalsIgnoreCase(type)) {
+        statisticsFilesCount.incrementAndGet();
+        LOG.debug("Deleted statistics file: {}", path);
+
       } else if (OTHERS.equalsIgnoreCase(type)) {
         otherFilesCount.incrementAndGet();
         LOG.debug("Deleted other metadata file: {}", path);
@@ -373,6 +396,10 @@ abstract class BaseSparkAction<ThisT> {
       return manifestListsCount.get();
     }
 
+    public long statisticsFilesCount() {
+      return statisticsFilesCount.get();
+    }
+
     public long otherFilesCount() {
       return otherFilesCount.get();
     }
@@ -383,6 +410,7 @@ abstract class BaseSparkAction<ThisT> {
           + equalityDeleteFilesCount()
           + manifestsCount()
           + manifestListsCount()
+          + statisticsFilesCount()
           + otherFilesCount();
     }
   }
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
index 95e153a9a5..a1db08663e 100644
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
@@ -32,8 +32,8 @@ import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.TableOperations;
-import org.apache.iceberg.actions.BaseExpireSnapshotsActionResult;
 import org.apache.iceberg.actions.ExpireSnapshots;
+import org.apache.iceberg.actions.ImmutableExpireSnapshots;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.io.SupportsBulkOperations;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -245,7 +245,8 @@ public class ExpireSnapshotsSparkAction extends 
BaseSparkAction<ExpireSnapshotsS
     Table staticTable = newStaticTable(metadata, table.io());
     return contentFileDS(staticTable, snapshotIds)
         .union(manifestDS(staticTable, snapshotIds))
-        .union(manifestListDS(staticTable, snapshotIds));
+        .union(manifestListDS(staticTable, snapshotIds))
+        .union(statisticsFileDS(staticTable, snapshotIds));
   }
 
   private Set<Long> findExpiredSnapshotIds(
@@ -277,11 +278,13 @@ public class ExpireSnapshotsSparkAction extends 
BaseSparkAction<ExpireSnapshotsS
 
     LOG.info("Deleted {} total files", summary.totalFilesCount());
 
-    return new BaseExpireSnapshotsActionResult(
-        summary.dataFilesCount(),
-        summary.positionDeleteFilesCount(),
-        summary.equalityDeleteFilesCount(),
-        summary.manifestsCount(),
-        summary.manifestListsCount());
+    return ImmutableExpireSnapshots.Result.builder()
+        .deletedDataFilesCount(summary.dataFilesCount())
+        .deletedPositionDeleteFilesCount(summary.positionDeleteFilesCount())
+        .deletedEqualityDeleteFilesCount(summary.equalityDeleteFilesCount())
+        .deletedManifestsCount(summary.manifestsCount())
+        .deletedManifestListsCount(summary.manifestListsCount())
+        .deletedStatisticsFilesCount(summary.statisticsFilesCount())
+        .build();
   }
 }
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
index a66310f493..9d2fc7e467 100644
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
@@ -67,7 +67,9 @@ public class ExpireSnapshotsProcedure extends BaseProcedure {
             new StructField(
                 "deleted_manifest_files_count", DataTypes.LongType, true, 
Metadata.empty()),
             new StructField(
-                "deleted_manifest_lists_count", DataTypes.LongType, true, 
Metadata.empty())
+                "deleted_manifest_lists_count", DataTypes.LongType, true, 
Metadata.empty()),
+            new StructField(
+                "deleted_statistics_files_count", DataTypes.LongType, true, 
Metadata.empty())
           });
 
   public static ProcedureBuilder builder() {
@@ -159,7 +161,8 @@ public class ExpireSnapshotsProcedure extends BaseProcedure 
{
             result.deletedPositionDeleteFilesCount(),
             result.deletedEqualityDeleteFilesCount(),
             result.deletedManifestsCount(),
-            result.deletedManifestListsCount());
+            result.deletedManifestListsCount(),
+            result.deletedStatisticsFilesCount());
     return new InternalRow[] {row};
   }
 

Reply via email to