This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 5921421cfe Spark-3.3: Handle statistics files while expiring snapshots
(#6091)
5921421cfe is described below
commit 5921421cfe70346284778fa2d4ebd594ba470205
Author: Ajantha Bhat <[email protected]>
AuthorDate: Sat Mar 11 00:33:36 2023 +0530
Spark-3.3: Handle statistics files while expiring snapshots (#6091)
---
.../apache/iceberg/actions/ExpireSnapshots.java | 9 ++
.../java/org/apache/iceberg/ReachableFileUtil.java | 22 +++-
.../actions/BaseExpireSnapshotsActionResult.java | 2 +
.../extensions/TestExpireSnapshotsProcedure.java | 144 +++++++++++++++++----
.../iceberg/spark/actions/BaseSparkAction.java | 28 ++++
.../spark/actions/ExpireSnapshotsSparkAction.java | 19 +--
.../spark/procedures/ExpireSnapshotsProcedure.java | 7 +-
7 files changed, 193 insertions(+), 38 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/actions/ExpireSnapshots.java
b/api/src/main/java/org/apache/iceberg/actions/ExpireSnapshots.java
index 0e02f4bec9..286ab115d1 100644
--- a/api/src/main/java/org/apache/iceberg/actions/ExpireSnapshots.java
+++ b/api/src/main/java/org/apache/iceberg/actions/ExpireSnapshots.java
@@ -22,6 +22,7 @@ import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.io.SupportsBulkOperations;
+import org.immutables.value.Value;
/**
* An action that expires snapshots in a table.
@@ -29,6 +30,7 @@ import org.apache.iceberg.io.SupportsBulkOperations;
* <p>Similar to {@link org.apache.iceberg.ExpireSnapshots} but may use a
query engine to distribute
* parts of the work.
*/
[email protected]
public interface ExpireSnapshots extends Action<ExpireSnapshots,
ExpireSnapshots.Result> {
/**
* Expires a specific {@link Snapshot} identified by id.
@@ -98,6 +100,7 @@ public interface ExpireSnapshots extends
Action<ExpireSnapshots, ExpireSnapshots
ExpireSnapshots executeDeleteWith(ExecutorService executorService);
/** The action result that contains a summary of the execution. */
+ @Value.Immutable
interface Result {
/** Returns the number of deleted data files. */
long deletedDataFilesCount();
@@ -113,5 +116,11 @@ public interface ExpireSnapshots extends
Action<ExpireSnapshots, ExpireSnapshots
/** Returns the number of deleted manifest lists. */
long deletedManifestListsCount();
+
+ /** Returns the number of deleted statistics files. */
+ @Value.Default
+ default long deletedStatisticsFilesCount() {
+ return 0L;
+ }
}
}
diff --git a/core/src/main/java/org/apache/iceberg/ReachableFileUtil.java
b/core/src/main/java/org/apache/iceberg/ReachableFileUtil.java
index 731eb943b5..60a1f5290a 100644
--- a/core/src/main/java/org/apache/iceberg/ReachableFileUtil.java
+++ b/core/src/main/java/org/apache/iceberg/ReachableFileUtil.java
@@ -20,6 +20,8 @@ package org.apache.iceberg;
import java.util.List;
import java.util.Set;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.TableMetadata.MetadataLogEntry;
import org.apache.iceberg.hadoop.Util;
@@ -137,11 +139,21 @@ public class ReachableFileUtil {
* @return the location of statistics files
*/
public static List<String> statisticsFilesLocations(Table table) {
- List<String> statisticsFilesLocations = Lists.newArrayList();
- for (StatisticsFile statisticsFile : table.statisticsFiles()) {
- statisticsFilesLocations.add(statisticsFile.path());
- }
+ return statisticsFilesLocations(table, statisticsFile -> true);
+ }
- return statisticsFilesLocations;
+ /**
+ * Returns locations of statistics files for a table matching the given
predicate .
+ *
+ * @param table table for which statistics files needs to be listed
+ * @param predicate predicate for filtering the statistics files
+ * @return the location of statistics files
+ */
+ public static List<String> statisticsFilesLocations(
+ Table table, Predicate<StatisticsFile> predicate) {
+ return table.statisticsFiles().stream()
+ .filter(predicate)
+ .map(StatisticsFile::path)
+ .collect(Collectors.toList());
}
}
diff --git
a/core/src/main/java/org/apache/iceberg/actions/BaseExpireSnapshotsActionResult.java
b/core/src/main/java/org/apache/iceberg/actions/BaseExpireSnapshotsActionResult.java
index 93fd8431b3..d750bcdbc9 100644
---
a/core/src/main/java/org/apache/iceberg/actions/BaseExpireSnapshotsActionResult.java
+++
b/core/src/main/java/org/apache/iceberg/actions/BaseExpireSnapshotsActionResult.java
@@ -18,6 +18,8 @@
*/
package org.apache.iceberg.actions;
+/** @deprecated will be removed in 1.3.0. */
+@Deprecated
public class BaseExpireSnapshotsActionResult implements ExpireSnapshots.Result
{
private final long deletedDataFilesCount;
diff --git
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
index e7f648ed6f..efb3d43668 100644
---
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
+++
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
@@ -20,28 +20,40 @@ package org.apache.iceberg.spark.extensions;
import static org.apache.iceberg.TableProperties.GC_ENABLED;
+import java.io.File;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.GenericBlobMetadata;
+import org.apache.iceberg.GenericStatisticsFile;
import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.StatisticsFile;
import org.apache.iceberg.Table;
import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.puffin.Blob;
+import org.apache.iceberg.puffin.Puffin;
+import org.apache.iceberg.puffin.PuffinWriter;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkCatalog;
import org.apache.iceberg.spark.data.TestHelpers;
import org.apache.iceberg.spark.source.SimpleRecord;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
+import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
@@ -63,7 +75,8 @@ public class TestExpireSnapshotsProcedure extends
SparkExtensionsTestBase {
sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg",
tableName);
List<Object[]> output = sql("CALL %s.system.expire_snapshots('%s')",
catalogName, tableIdent);
- assertEquals("Should not delete any files", ImmutableList.of(row(0L, 0L,
0L, 0L, 0L)), output);
+ assertEquals(
+ "Should not delete any files", ImmutableList.of(row(0L, 0L, 0L, 0L,
0L, 0L)), output);
}
@Test
@@ -91,7 +104,8 @@ public class TestExpireSnapshotsProcedure extends
SparkExtensionsTestBase {
sql(
"CALL %s.system.expire_snapshots('%s', TIMESTAMP '%s')",
catalogName, tableIdent, secondSnapshotTimestamp);
- assertEquals("Procedure output must match", ImmutableList.of(row(0L, 0L,
0L, 0L, 1L)), output1);
+ assertEquals(
+ "Procedure output must match", ImmutableList.of(row(0L, 0L, 0L, 0L,
1L, 0L)), output1);
table.refresh();
@@ -117,7 +131,8 @@ public class TestExpireSnapshotsProcedure extends
SparkExtensionsTestBase {
sql(
"CALL %s.system.expire_snapshots('%s', TIMESTAMP '%s', 2)",
catalogName, tableIdent, currentTimestamp);
- assertEquals("Procedure output must match", ImmutableList.of(row(2L, 0L,
0L, 2L, 1L)), output);
+ assertEquals(
+ "Procedure output must match", ImmutableList.of(row(2L, 0L, 0L, 2L,
1L, 0L)), output);
}
@Test
@@ -137,12 +152,10 @@ public class TestExpireSnapshotsProcedure extends
SparkExtensionsTestBase {
List<Object[]> output =
sql(
- "CALL %s.system.expire_snapshots("
- + "older_than => TIMESTAMP '%s',"
- + "table => '%s',"
- + "retain_last => 1)",
+ "CALL %s.system.expire_snapshots(older_than => TIMESTAMP
'%s',table => '%s')",
catalogName, currentTimestamp, tableIdent);
- assertEquals("Procedure output must match", ImmutableList.of(row(0L, 0L,
0L, 0L, 1L)), output);
+ assertEquals(
+ "Procedure output must match", ImmutableList.of(row(0L, 0L, 0L, 0L,
1L, 0L)), output);
}
@Test
@@ -231,12 +244,11 @@ public class TestExpireSnapshotsProcedure extends
SparkExtensionsTestBase {
"CALL %s.system.expire_snapshots("
+ "older_than => TIMESTAMP '%s',"
+ "table => '%s',"
- + "max_concurrent_deletes => %s,"
- + "retain_last => 1)",
+ + "max_concurrent_deletes => %s)",
catalogName, currentTimestamp, tableIdent, 4);
assertEquals(
"Expiring snapshots concurrently should succeed",
- ImmutableList.of(row(0L, 0L, 0L, 0L, 3L)),
+ ImmutableList.of(row(0L, 0L, 0L, 0L, 3L, 0L)),
output);
}
@@ -283,7 +295,7 @@ public class TestExpireSnapshotsProcedure extends
SparkExtensionsTestBase {
.append();
sql("DELETE FROM %s WHERE id=1", tableName);
- Table table = Spark3Util.loadIcebergTable(spark, tableName);
+ Table table = validationCatalog.loadTable(tableIdent);
Assert.assertEquals(
"Should have 1 delete manifest", 1,
TestHelpers.deleteManifests(table).size());
@@ -318,15 +330,12 @@ public class TestExpireSnapshotsProcedure extends
SparkExtensionsTestBase {
Timestamp currentTimestamp =
Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis()));
List<Object[]> output =
sql(
- "CALL %s.system.expire_snapshots("
- + "older_than => TIMESTAMP '%s',"
- + "table => '%s',"
- + "retain_last => 1)",
+ "CALL %s.system.expire_snapshots(older_than => TIMESTAMP
'%s',table => '%s')",
catalogName, currentTimestamp, tableIdent);
assertEquals(
"Should deleted 1 data and pos delete file and 4 manifests and lists
(one for each txn)",
- ImmutableList.of(row(1L, 1L, 0L, 4L, 4L)),
+ ImmutableList.of(row(1L, 1L, 0L, 4L, 4L, 0L)),
output);
Assert.assertFalse("Delete manifest should be removed",
localFs.exists(deleteManifestPath));
Assert.assertFalse("Delete file should be removed",
localFs.exists(deleteFilePath));
@@ -352,10 +361,10 @@ public class TestExpireSnapshotsProcedure extends
SparkExtensionsTestBase {
"CALL %s.system.expire_snapshots("
+ "older_than => TIMESTAMP '%s',"
+ "table => '%s',"
- + "retain_last => 1, "
+ "stream_results => true)",
catalogName, currentTimestamp, tableIdent);
- assertEquals("Procedure output must match", ImmutableList.of(row(0L, 0L,
0L, 0L, 1L)), output);
+ assertEquals(
+ "Procedure output must match", ImmutableList.of(row(0L, 0L, 0L, 0L,
1L, 0L)), output);
}
@Test
@@ -434,13 +443,102 @@ public class TestExpireSnapshotsProcedure extends
SparkExtensionsTestBase {
+ "/* And comments that span *multiple* \n"
+ " lines */ CALL /* this is the actual CALL */
%s.system.expire_snapshots("
+ " older_than => TIMESTAMP '%s',"
- + " table => '%s',"
- + " retain_last => 1)";
+ + " table => '%s')";
List<Object[]> output = sql(callStatement, catalogName, currentTimestamp,
tableIdent);
- assertEquals("Procedure output must match", ImmutableList.of(row(0L, 0L,
0L, 0L, 1L)), output);
+ assertEquals(
+ "Procedure output must match", ImmutableList.of(row(0L, 0L, 0L, 0L,
1L, 0L)), output);
table.refresh();
Assert.assertEquals("Should be 1 snapshot remaining", 1,
Iterables.size(table.snapshots()));
}
+
+ @Test
+ public void testExpireSnapshotsWithStatisticFiles() throws Exception {
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg",
tableName);
+ sql("INSERT INTO TABLE %s VALUES (10, 'abc')", tableName);
+ Table table = validationCatalog.loadTable(tableIdent);
+ String statsFileLocation1 = statsFileLocation(table.location());
+ StatisticsFile statisticsFile1 =
+ writeStatsFile(
+ table.currentSnapshot().snapshotId(),
+ table.currentSnapshot().sequenceNumber(),
+ statsFileLocation1,
+ table.io());
+ table.updateStatistics().setStatistics(statisticsFile1.snapshotId(),
statisticsFile1).commit();
+
+ sql("INSERT INTO %s SELECT 20, 'def'", tableName);
+ table.refresh();
+ String statsFileLocation2 = statsFileLocation(table.location());
+ StatisticsFile statisticsFile2 =
+ writeStatsFile(
+ table.currentSnapshot().snapshotId(),
+ table.currentSnapshot().sequenceNumber(),
+ statsFileLocation2,
+ table.io());
+ table.updateStatistics().setStatistics(statisticsFile2.snapshotId(),
statisticsFile2).commit();
+
+ waitUntilAfter(table.currentSnapshot().timestampMillis());
+
+ Timestamp currentTimestamp =
Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis()));
+ List<Object[]> output =
+ sql(
+ "CALL %s.system.expire_snapshots(older_than => TIMESTAMP
'%s',table => '%s')",
+ catalogName, currentTimestamp, tableIdent);
+ Assertions.assertThat(output.get(0)[5]).as("should be 1 deleted statistics
file").isEqualTo(1L);
+
+ table.refresh();
+ List<StatisticsFile> statsWithSnapshotId1 =
+ table.statisticsFiles().stream()
+ .filter(statisticsFile -> statisticsFile.snapshotId() ==
statisticsFile1.snapshotId())
+ .collect(Collectors.toList());
+ Assertions.assertThat(statsWithSnapshotId1)
+ .as(
+ "Statistics file entry in TableMetadata should be deleted for the
snapshot %s",
+ statisticsFile1.snapshotId())
+ .isEmpty();
+ Assertions.assertThat(table.statisticsFiles())
+ .as(
+ "Statistics file entry in TableMetadata should be present for the
snapshot %s",
+ statisticsFile2.snapshotId())
+ .extracting(StatisticsFile::snapshotId)
+ .containsExactly(statisticsFile2.snapshotId());
+
+ Assertions.assertThat(new File(statsFileLocation1))
+ .as("Statistics file should not exist for snapshot %s",
statisticsFile1.snapshotId())
+ .doesNotExist();
+
+ Assertions.assertThat(new File(statsFileLocation2))
+ .as("Statistics file should exist for snapshot %s",
statisticsFile2.snapshotId())
+ .exists();
+ }
+
+ private StatisticsFile writeStatsFile(
+ long snapshotId, long snapshotSequenceNumber, String statsLocation,
FileIO fileIO)
+ throws IOException {
+ try (PuffinWriter puffinWriter =
Puffin.write(fileIO.newOutputFile(statsLocation)).build()) {
+ puffinWriter.add(
+ new Blob(
+ "some-blob-type",
+ ImmutableList.of(1),
+ snapshotId,
+ snapshotSequenceNumber,
+ ByteBuffer.wrap("blob
content".getBytes(StandardCharsets.UTF_8))));
+ puffinWriter.finish();
+
+ return new GenericStatisticsFile(
+ snapshotId,
+ statsLocation,
+ puffinWriter.fileSize(),
+ puffinWriter.footerSize(),
+ puffinWriter.writtenBlobsMetadata().stream()
+ .map(GenericBlobMetadata::from)
+ .collect(ImmutableList.toImmutableList()));
+ }
+ }
+
+ private String statsFileLocation(String tableLocation) {
+ String statsFileName = "stats-file-" + UUID.randomUUID();
+ return tableLocation.replaceFirst("file:", "") + "/metadata/" +
statsFileName;
+ }
}
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
index 1b285e8cac..3c007c6214 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
@@ -31,6 +31,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
+import java.util.function.Predicate;
import java.util.function.Supplier;
import org.apache.iceberg.AllManifestsTable;
import org.apache.iceberg.BaseTable;
@@ -43,6 +44,7 @@ import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.ReachableFileUtil;
import org.apache.iceberg.StaticTableOperations;
+import org.apache.iceberg.StatisticsFile;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.exceptions.NotFoundException;
@@ -80,6 +82,7 @@ abstract class BaseSparkAction<ThisT> {
protected static final String MANIFEST = "Manifest";
protected static final String MANIFEST_LIST = "Manifest List";
+ protected static final String STATISTICS_FILES = "Statistics Files";
protected static final String OTHERS = "Others";
protected static final String FILE_PATH = "file_path";
@@ -200,6 +203,18 @@ abstract class BaseSparkAction<ThisT> {
return toFileInfoDS(manifestLists, MANIFEST_LIST);
}
+ protected Dataset<FileInfo> statisticsFileDS(Table table, Set<Long>
snapshotIds) {
+ Predicate<StatisticsFile> predicate;
+ if (snapshotIds == null) {
+ predicate = statisticsFile -> true;
+ } else {
+ predicate = statisticsFile ->
snapshotIds.contains(statisticsFile.snapshotId());
+ }
+
+ List<String> statisticsFiles =
ReachableFileUtil.statisticsFilesLocations(table, predicate);
+ return toFileInfoDS(statisticsFiles, STATISTICS_FILES);
+ }
+
protected Dataset<FileInfo> otherMetadataFileDS(Table table) {
return otherMetadataFileDS(table, false /* include all reachable old
metadata locations */);
}
@@ -297,6 +312,7 @@ abstract class BaseSparkAction<ThisT> {
private final AtomicLong equalityDeleteFilesCount = new AtomicLong(0L);
private final AtomicLong manifestsCount = new AtomicLong(0L);
private final AtomicLong manifestListsCount = new AtomicLong(0L);
+ private final AtomicLong statisticsFilesCount = new AtomicLong(0L);
private final AtomicLong otherFilesCount = new AtomicLong(0L);
public void deletedFiles(String type, int numFiles) {
@@ -315,6 +331,9 @@ abstract class BaseSparkAction<ThisT> {
} else if (MANIFEST_LIST.equalsIgnoreCase(type)) {
manifestListsCount.addAndGet(numFiles);
+ } else if (STATISTICS_FILES.equalsIgnoreCase(type)) {
+ statisticsFilesCount.addAndGet(numFiles);
+
} else if (OTHERS.equalsIgnoreCase(type)) {
otherFilesCount.addAndGet(numFiles);
@@ -344,6 +363,10 @@ abstract class BaseSparkAction<ThisT> {
manifestListsCount.incrementAndGet();
LOG.debug("Deleted manifest list: {}", path);
+ } else if (STATISTICS_FILES.equalsIgnoreCase(type)) {
+ statisticsFilesCount.incrementAndGet();
+ LOG.debug("Deleted statistics file: {}", path);
+
} else if (OTHERS.equalsIgnoreCase(type)) {
otherFilesCount.incrementAndGet();
LOG.debug("Deleted other metadata file: {}", path);
@@ -373,6 +396,10 @@ abstract class BaseSparkAction<ThisT> {
return manifestListsCount.get();
}
+ public long statisticsFilesCount() {
+ return statisticsFilesCount.get();
+ }
+
public long otherFilesCount() {
return otherFilesCount.get();
}
@@ -383,6 +410,7 @@ abstract class BaseSparkAction<ThisT> {
+ equalityDeleteFilesCount()
+ manifestsCount()
+ manifestListsCount()
+ + statisticsFilesCount()
+ otherFilesCount();
}
}
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
index 95e153a9a5..a1db08663e 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
@@ -32,8 +32,8 @@ import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
-import org.apache.iceberg.actions.BaseExpireSnapshotsActionResult;
import org.apache.iceberg.actions.ExpireSnapshots;
+import org.apache.iceberg.actions.ImmutableExpireSnapshots;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.io.SupportsBulkOperations;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -245,7 +245,8 @@ public class ExpireSnapshotsSparkAction extends
BaseSparkAction<ExpireSnapshotsS
Table staticTable = newStaticTable(metadata, table.io());
return contentFileDS(staticTable, snapshotIds)
.union(manifestDS(staticTable, snapshotIds))
- .union(manifestListDS(staticTable, snapshotIds));
+ .union(manifestListDS(staticTable, snapshotIds))
+ .union(statisticsFileDS(staticTable, snapshotIds));
}
private Set<Long> findExpiredSnapshotIds(
@@ -277,11 +278,13 @@ public class ExpireSnapshotsSparkAction extends
BaseSparkAction<ExpireSnapshotsS
LOG.info("Deleted {} total files", summary.totalFilesCount());
- return new BaseExpireSnapshotsActionResult(
- summary.dataFilesCount(),
- summary.positionDeleteFilesCount(),
- summary.equalityDeleteFilesCount(),
- summary.manifestsCount(),
- summary.manifestListsCount());
+ return ImmutableExpireSnapshots.Result.builder()
+ .deletedDataFilesCount(summary.dataFilesCount())
+ .deletedPositionDeleteFilesCount(summary.positionDeleteFilesCount())
+ .deletedEqualityDeleteFilesCount(summary.equalityDeleteFilesCount())
+ .deletedManifestsCount(summary.manifestsCount())
+ .deletedManifestListsCount(summary.manifestListsCount())
+ .deletedStatisticsFilesCount(summary.statisticsFilesCount())
+ .build();
}
}
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
index a66310f493..9d2fc7e467 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
@@ -67,7 +67,9 @@ public class ExpireSnapshotsProcedure extends BaseProcedure {
new StructField(
"deleted_manifest_files_count", DataTypes.LongType, true,
Metadata.empty()),
new StructField(
- "deleted_manifest_lists_count", DataTypes.LongType, true,
Metadata.empty())
+ "deleted_manifest_lists_count", DataTypes.LongType, true,
Metadata.empty()),
+ new StructField(
+ "deleted_statistics_files_count", DataTypes.LongType, true,
Metadata.empty())
});
public static ProcedureBuilder builder() {
@@ -159,7 +161,8 @@ public class ExpireSnapshotsProcedure extends BaseProcedure
{
result.deletedPositionDeleteFilesCount(),
result.deletedEqualityDeleteFilesCount(),
result.deletedManifestsCount(),
- result.deletedManifestListsCount());
+ result.deletedManifestListsCount(),
+ result.deletedStatisticsFilesCount());
return new InternalRow[] {row};
}