This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 02836eaac8 Spark: Ensure partition stats files are considered for GC
procedures (#9284)
02836eaac8 is described below
commit 02836eaac8c8cd18f9998d0ae4411b09d3649e1a
Author: Ajantha Bhat <[email protected]>
AuthorDate: Fri Jan 19 00:40:53 2024 +0530
Spark: Ensure partition stats files are considered for GC procedures (#9284)
---
.../java/org/apache/iceberg/ReachableFileUtil.java | 42 +++++++++++-
.../iceberg/spark/extensions/ProcedureUtil.java | 54 +++++++++++++++
.../extensions/TestExpireSnapshotsProcedure.java | 76 ++++++++++++++++------
.../extensions/TestRemoveOrphanFilesProcedure.java | 70 ++++++++++++++++++++
.../iceberg/spark/actions/BaseSparkAction.java | 12 +---
.../iceberg/spark/extensions/ProcedureUtil.java | 54 +++++++++++++++
.../extensions/TestExpireSnapshotsProcedure.java | 76 ++++++++++++++++------
.../extensions/TestRemoveOrphanFilesProcedure.java | 70 ++++++++++++++++++++
.../iceberg/spark/actions/BaseSparkAction.java | 12 +---
.../iceberg/spark/extensions/ProcedureUtil.java | 54 +++++++++++++++
.../extensions/TestExpireSnapshotsProcedure.java | 76 ++++++++++++++++------
.../extensions/TestRemoveOrphanFilesProcedure.java | 70 ++++++++++++++++++++
.../iceberg/spark/actions/BaseSparkAction.java | 12 +---
13 files changed, 586 insertions(+), 92 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/ReachableFileUtil.java
b/core/src/main/java/org/apache/iceberg/ReachableFileUtil.java
index bd23a221ab..ee1ff28e03 100644
--- a/core/src/main/java/org/apache/iceberg/ReachableFileUtil.java
+++ b/core/src/main/java/org/apache/iceberg/ReachableFileUtil.java
@@ -133,13 +133,13 @@ public class ReachableFileUtil {
}
/**
- * Returns locations of statistics files in a table.
+ * Returns locations of all statistics files in a table.
*
* @param table table for which statistics files needs to be listed
* @return the location of statistics files
*/
public static List<String> statisticsFilesLocations(Table table) {
- return statisticsFilesLocations(table, statisticsFile -> true);
+ return statisticsFilesLocationsForSnapshots(table, null);
}
/**
@@ -148,7 +148,10 @@ public class ReachableFileUtil {
* @param table table for which statistics files needs to be listed
* @param predicate predicate for filtering the statistics files
* @return the location of statistics files
+ * @deprecated since 1.5.0, will be removed in 1.6.0; use the {@code
+ * statisticsFilesLocationsForSnapshots(table, snapshotIds)} instead.
*/
+ @Deprecated
public static List<String> statisticsFilesLocations(
Table table, Predicate<StatisticsFile> predicate) {
return table.statisticsFiles().stream()
@@ -156,4 +159,39 @@ public class ReachableFileUtil {
.map(StatisticsFile::path)
.collect(Collectors.toList());
}
+
+ /**
+ * Returns locations of all statistics files for a table matching the given
snapshot IDs.
+ *
+ * @param table table for which statistics files needs to be listed
+ * @param snapshotIds ids of snapshots for which statistics files will be
returned. If null,
+ * statistics files for all the snapshots will be returned.
+ * @return the location of statistics files
+ */
+ public static List<String> statisticsFilesLocationsForSnapshots(
+ Table table, Set<Long> snapshotIds) {
+ List<String> statsFileLocations = Lists.newArrayList();
+
+ Predicate<StatisticsFile> statsFilePredicate;
+ Predicate<PartitionStatisticsFile> partitionStatsFilePredicate;
+ if (snapshotIds == null) {
+ statsFilePredicate = file -> true;
+ partitionStatsFilePredicate = file -> true;
+ } else {
+ statsFilePredicate = file -> snapshotIds.contains(file.snapshotId());
+ partitionStatsFilePredicate = file ->
snapshotIds.contains(file.snapshotId());
+ }
+
+ table.statisticsFiles().stream()
+ .filter(statsFilePredicate)
+ .map(StatisticsFile::path)
+ .forEach(statsFileLocations::add);
+
+ table.partitionStatisticsFiles().stream()
+ .filter(partitionStatsFilePredicate)
+ .map(PartitionStatisticsFile::path)
+ .forEach(statsFileLocations::add);
+
+ return statsFileLocations;
+ }
}
diff --git
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ProcedureUtil.java
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ProcedureUtil.java
new file mode 100644
index 0000000000..de4acd74a7
--- /dev/null
+++
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ProcedureUtil.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.extensions;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.UUID;
+import org.apache.iceberg.ImmutableGenericPartitionStatisticsFile;
+import org.apache.iceberg.PartitionStatisticsFile;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.PositionOutputStream;
+
+public class ProcedureUtil {
+
+ private ProcedureUtil() {}
+
+ static PartitionStatisticsFile writePartitionStatsFile(
+ long snapshotId, String statsLocation, FileIO fileIO) {
+ PositionOutputStream positionOutputStream;
+ try {
+ positionOutputStream = fileIO.newOutputFile(statsLocation).create();
+ positionOutputStream.close();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+
+ return ImmutableGenericPartitionStatisticsFile.builder()
+ .snapshotId(snapshotId)
+ .fileSizeInBytes(42L)
+ .path(statsLocation)
+ .build();
+ }
+
+ static String statsFileLocation(String tableLocation) {
+ String statsFileName = "stats-file-" + UUID.randomUUID();
+ return tableLocation.replaceFirst("file:", "") + "/metadata/" +
statsFileName;
+ }
+}
diff --git
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
index efb3d43668..6383521a44 100644
---
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
+++
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
@@ -28,14 +28,13 @@ import java.sql.Timestamp;
import java.time.Instant;
import java.util.List;
import java.util.Map;
-import java.util.UUID;
-import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.GenericBlobMetadata;
import org.apache.iceberg.GenericStatisticsFile;
+import org.apache.iceberg.PartitionStatisticsFile;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.StatisticsFile;
import org.apache.iceberg.Table;
@@ -458,7 +457,7 @@ public class TestExpireSnapshotsProcedure extends
SparkExtensionsTestBase {
sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg",
tableName);
sql("INSERT INTO TABLE %s VALUES (10, 'abc')", tableName);
Table table = validationCatalog.loadTable(tableIdent);
- String statsFileLocation1 = statsFileLocation(table.location());
+ String statsFileLocation1 =
ProcedureUtil.statsFileLocation(table.location());
StatisticsFile statisticsFile1 =
writeStatsFile(
table.currentSnapshot().snapshotId(),
@@ -469,7 +468,7 @@ public class TestExpireSnapshotsProcedure extends
SparkExtensionsTestBase {
sql("INSERT INTO %s SELECT 20, 'def'", tableName);
table.refresh();
- String statsFileLocation2 = statsFileLocation(table.location());
+ String statsFileLocation2 =
ProcedureUtil.statsFileLocation(table.location());
StatisticsFile statisticsFile2 =
writeStatsFile(
table.currentSnapshot().snapshotId(),
@@ -488,18 +487,9 @@ public class TestExpireSnapshotsProcedure extends
SparkExtensionsTestBase {
Assertions.assertThat(output.get(0)[5]).as("should be 1 deleted statistics
file").isEqualTo(1L);
table.refresh();
- List<StatisticsFile> statsWithSnapshotId1 =
- table.statisticsFiles().stream()
- .filter(statisticsFile -> statisticsFile.snapshotId() ==
statisticsFile1.snapshotId())
- .collect(Collectors.toList());
- Assertions.assertThat(statsWithSnapshotId1)
- .as(
- "Statistics file entry in TableMetadata should be deleted for the
snapshot %s",
- statisticsFile1.snapshotId())
- .isEmpty();
Assertions.assertThat(table.statisticsFiles())
.as(
- "Statistics file entry in TableMetadata should be present for the
snapshot %s",
+ "Statistics file entry in TableMetadata should be present only for
the snapshot %s",
statisticsFile2.snapshotId())
.extracting(StatisticsFile::snapshotId)
.containsExactly(statisticsFile2.snapshotId());
@@ -513,7 +503,58 @@ public class TestExpireSnapshotsProcedure extends
SparkExtensionsTestBase {
.exists();
}
- private StatisticsFile writeStatsFile(
+ @Test
+ public void testExpireSnapshotsWithPartitionStatisticFiles() {
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg",
tableName);
+ sql("INSERT INTO TABLE %s VALUES (10, 'abc')", tableName);
+ Table table = validationCatalog.loadTable(tableIdent);
+ String partitionStatsFileLocation1 =
ProcedureUtil.statsFileLocation(table.location());
+ PartitionStatisticsFile partitionStatisticsFile1 =
+ ProcedureUtil.writePartitionStatsFile(
+ table.currentSnapshot().snapshotId(), partitionStatsFileLocation1,
table.io());
+
table.updatePartitionStatistics().setPartitionStatistics(partitionStatisticsFile1).commit();
+
+ sql("INSERT INTO %s SELECT 20, 'def'", tableName);
+ table.refresh();
+ String partitionStatsFileLocation2 =
ProcedureUtil.statsFileLocation(table.location());
+ PartitionStatisticsFile partitionStatisticsFile2 =
+ ProcedureUtil.writePartitionStatsFile(
+ table.currentSnapshot().snapshotId(), partitionStatsFileLocation2,
table.io());
+
table.updatePartitionStatistics().setPartitionStatistics(partitionStatisticsFile2).commit();
+
+ waitUntilAfter(table.currentSnapshot().timestampMillis());
+
+ Timestamp currentTimestamp =
Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis()));
+ List<Object[]> output =
+ sql(
+ "CALL %s.system.expire_snapshots(older_than => TIMESTAMP
'%s',table => '%s')",
+ catalogName, currentTimestamp, tableIdent);
+ Assertions.assertThat(output.get(0)[5])
+ .as("should be 1 deleted partition statistics file")
+ .isEqualTo(1L);
+
+ table.refresh();
+ Assertions.assertThat(table.partitionStatisticsFiles())
+ .as(
+ "partition statistics file entry in TableMetadata should be
present only for the snapshot %s",
+ partitionStatisticsFile2.snapshotId())
+ .extracting(PartitionStatisticsFile::snapshotId)
+ .containsExactly(partitionStatisticsFile2.snapshotId());
+
+ Assertions.assertThat(new File(partitionStatsFileLocation1))
+ .as(
+ "partition statistics file should not exist for snapshot %s",
+ partitionStatisticsFile1.snapshotId())
+ .doesNotExist();
+
+ Assertions.assertThat(new File(partitionStatsFileLocation2))
+ .as(
+ "partition statistics file should exist for snapshot %s",
+ partitionStatisticsFile2.snapshotId())
+ .exists();
+ }
+
+ private static StatisticsFile writeStatsFile(
long snapshotId, long snapshotSequenceNumber, String statsLocation,
FileIO fileIO)
throws IOException {
try (PuffinWriter puffinWriter =
Puffin.write(fileIO.newOutputFile(statsLocation)).build()) {
@@ -536,9 +577,4 @@ public class TestExpireSnapshotsProcedure extends
SparkExtensionsTestBase {
.collect(ImmutableList.toImmutableList()));
}
}
-
- private String statsFileLocation(String tableLocation) {
- String statsFileName = "stats-file-" + UUID.randomUUID();
- return tableLocation.replaceFirst("file:", "") + "/metadata/" +
statsFileName;
- }
}
diff --git
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
index be82880cb7..05eb7a6f80 100644
---
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
+++
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
@@ -42,6 +42,7 @@ import org.apache.iceberg.GenericBlobMetadata;
import org.apache.iceberg.GenericStatisticsFile;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.PartitionStatisticsFile;
import org.apache.iceberg.ReachableFileUtil;
import org.apache.iceberg.StatisticsFile;
import org.apache.iceberg.Table;
@@ -541,6 +542,75 @@ public class TestRemoveOrphanFilesProcedure extends
SparkExtensionsTestBase {
Assertions.assertThat(statsLocation.exists()).as("stats file should be
deleted").isFalse();
}
+ @Test
+ public void testRemoveOrphanFilesWithPartitionStatisticFiles() throws
Exception {
+ sql(
+ "CREATE TABLE %s USING iceberg "
+ + "TBLPROPERTIES('format-version'='2') "
+ + "AS SELECT 10 int, 'abc' data",
+ tableName);
+ Table table = Spark3Util.loadIcebergTable(spark, tableName);
+
+ String partitionStatsLocation =
ProcedureUtil.statsFileLocation(table.location());
+ PartitionStatisticsFile partitionStatisticsFile =
+ ProcedureUtil.writePartitionStatsFile(
+ table.currentSnapshot().snapshotId(), partitionStatsLocation,
table.io());
+
+ commitPartitionStatsTxn(table, partitionStatisticsFile);
+
+ // wait to ensure files are old enough
+ waitUntilAfter(System.currentTimeMillis());
+ Timestamp currentTimestamp =
Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis()));
+
+ List<Object[]> output =
+ sql(
+ "CALL %s.system.remove_orphan_files("
+ + "table => '%s',"
+ + "older_than => TIMESTAMP '%s')",
+ catalogName, tableIdent, currentTimestamp);
+ Assertions.assertThat(output).as("Should be no orphan files").isEmpty();
+
+ Assertions.assertThat(new File(partitionStatsLocation))
+ .as("partition stats file should exist")
+ .exists();
+
+ removePartitionStatsTxn(table, partitionStatisticsFile);
+
+ output =
+ sql(
+ "CALL %s.system.remove_orphan_files("
+ + "table => '%s',"
+ + "older_than => TIMESTAMP '%s')",
+ catalogName, tableIdent, currentTimestamp);
+ Assertions.assertThat(output).as("Should be orphan files").hasSize(1);
+ Assertions.assertThat(Iterables.getOnlyElement(output))
+ .as("Deleted files")
+ .containsExactly("file:" + partitionStatsLocation);
+ Assertions.assertThat(new File(partitionStatsLocation))
+ .as("partition stats file should be deleted")
+ .doesNotExist();
+ }
+
+ private static void removePartitionStatsTxn(
+ Table table, PartitionStatisticsFile partitionStatisticsFile) {
+ Transaction transaction = table.newTransaction();
+ transaction
+ .updatePartitionStatistics()
+ .removePartitionStatistics(partitionStatisticsFile.snapshotId())
+ .commit();
+ transaction.commitTransaction();
+ }
+
+ private static void commitPartitionStatsTxn(
+ Table table, PartitionStatisticsFile partitionStatisticsFile) {
+ Transaction transaction = table.newTransaction();
+ transaction
+ .updatePartitionStatistics()
+ .setPartitionStatistics(partitionStatisticsFile)
+ .commit();
+ transaction.commitTransaction();
+ }
+
@Test
public void testRemoveOrphanFilesProcedureWithPrefixMode()
throws NoSuchTableException, ParseException, IOException {
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
index 3c007c6214..c5b8083583 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
@@ -31,7 +31,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
-import java.util.function.Predicate;
import java.util.function.Supplier;
import org.apache.iceberg.AllManifestsTable;
import org.apache.iceberg.BaseTable;
@@ -44,7 +43,6 @@ import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.ReachableFileUtil;
import org.apache.iceberg.StaticTableOperations;
-import org.apache.iceberg.StatisticsFile;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.exceptions.NotFoundException;
@@ -204,14 +202,8 @@ abstract class BaseSparkAction<ThisT> {
}
protected Dataset<FileInfo> statisticsFileDS(Table table, Set<Long>
snapshotIds) {
- Predicate<StatisticsFile> predicate;
- if (snapshotIds == null) {
- predicate = statisticsFile -> true;
- } else {
- predicate = statisticsFile ->
snapshotIds.contains(statisticsFile.snapshotId());
- }
-
- List<String> statisticsFiles =
ReachableFileUtil.statisticsFilesLocations(table, predicate);
+ List<String> statisticsFiles =
+ ReachableFileUtil.statisticsFilesLocationsForSnapshots(table,
snapshotIds);
return toFileInfoDS(statisticsFiles, STATISTICS_FILES);
}
diff --git
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ProcedureUtil.java
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ProcedureUtil.java
new file mode 100644
index 0000000000..de4acd74a7
--- /dev/null
+++
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ProcedureUtil.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.extensions;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.UUID;
+import org.apache.iceberg.ImmutableGenericPartitionStatisticsFile;
+import org.apache.iceberg.PartitionStatisticsFile;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.PositionOutputStream;
+
+public class ProcedureUtil {
+
+ private ProcedureUtil() {}
+
+ static PartitionStatisticsFile writePartitionStatsFile(
+ long snapshotId, String statsLocation, FileIO fileIO) {
+ PositionOutputStream positionOutputStream;
+ try {
+ positionOutputStream = fileIO.newOutputFile(statsLocation).create();
+ positionOutputStream.close();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+
+ return ImmutableGenericPartitionStatisticsFile.builder()
+ .snapshotId(snapshotId)
+ .fileSizeInBytes(42L)
+ .path(statsLocation)
+ .build();
+ }
+
+ static String statsFileLocation(String tableLocation) {
+ String statsFileName = "stats-file-" + UUID.randomUUID();
+ return tableLocation.replaceFirst("file:", "") + "/metadata/" +
statsFileName;
+ }
+}
diff --git
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
index c4b93c6d6a..7dacce5487 100644
---
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
+++
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
@@ -28,13 +28,12 @@ import java.sql.Timestamp;
import java.time.Instant;
import java.util.List;
import java.util.Map;
-import java.util.UUID;
-import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.GenericBlobMetadata;
import org.apache.iceberg.GenericStatisticsFile;
+import org.apache.iceberg.PartitionStatisticsFile;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.StatisticsFile;
import org.apache.iceberg.Table;
@@ -445,7 +444,7 @@ public class TestExpireSnapshotsProcedure extends
SparkExtensionsTestBase {
sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg",
tableName);
sql("INSERT INTO TABLE %s VALUES (10, 'abc')", tableName);
Table table = validationCatalog.loadTable(tableIdent);
- String statsFileLocation1 = statsFileLocation(table.location());
+ String statsFileLocation1 =
ProcedureUtil.statsFileLocation(table.location());
StatisticsFile statisticsFile1 =
writeStatsFile(
table.currentSnapshot().snapshotId(),
@@ -456,7 +455,7 @@ public class TestExpireSnapshotsProcedure extends
SparkExtensionsTestBase {
sql("INSERT INTO %s SELECT 20, 'def'", tableName);
table.refresh();
- String statsFileLocation2 = statsFileLocation(table.location());
+ String statsFileLocation2 =
ProcedureUtil.statsFileLocation(table.location());
StatisticsFile statisticsFile2 =
writeStatsFile(
table.currentSnapshot().snapshotId(),
@@ -475,18 +474,9 @@ public class TestExpireSnapshotsProcedure extends
SparkExtensionsTestBase {
Assertions.assertThat(output.get(0)[5]).as("should be 1 deleted statistics
file").isEqualTo(1L);
table.refresh();
- List<StatisticsFile> statsWithSnapshotId1 =
- table.statisticsFiles().stream()
- .filter(statisticsFile -> statisticsFile.snapshotId() ==
statisticsFile1.snapshotId())
- .collect(Collectors.toList());
- Assertions.assertThat(statsWithSnapshotId1)
- .as(
- "Statistics file entry in TableMetadata should be deleted for the
snapshot %s",
- statisticsFile1.snapshotId())
- .isEmpty();
Assertions.assertThat(table.statisticsFiles())
.as(
- "Statistics file entry in TableMetadata should be present for the
snapshot %s",
+ "Statistics file entry in TableMetadata should be present only for
the snapshot %s",
statisticsFile2.snapshotId())
.extracting(StatisticsFile::snapshotId)
.containsExactly(statisticsFile2.snapshotId());
@@ -500,7 +490,58 @@ public class TestExpireSnapshotsProcedure extends
SparkExtensionsTestBase {
.exists();
}
- private StatisticsFile writeStatsFile(
+ @Test
+ public void testExpireSnapshotsWithPartitionStatisticFiles() {
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg",
tableName);
+ sql("INSERT INTO TABLE %s VALUES (10, 'abc')", tableName);
+ Table table = validationCatalog.loadTable(tableIdent);
+ String partitionStatsFileLocation1 =
ProcedureUtil.statsFileLocation(table.location());
+ PartitionStatisticsFile partitionStatisticsFile1 =
+ ProcedureUtil.writePartitionStatsFile(
+ table.currentSnapshot().snapshotId(), partitionStatsFileLocation1,
table.io());
+
table.updatePartitionStatistics().setPartitionStatistics(partitionStatisticsFile1).commit();
+
+ sql("INSERT INTO %s SELECT 20, 'def'", tableName);
+ table.refresh();
+ String partitionStatsFileLocation2 =
ProcedureUtil.statsFileLocation(table.location());
+ PartitionStatisticsFile partitionStatisticsFile2 =
+ ProcedureUtil.writePartitionStatsFile(
+ table.currentSnapshot().snapshotId(), partitionStatsFileLocation2,
table.io());
+
table.updatePartitionStatistics().setPartitionStatistics(partitionStatisticsFile2).commit();
+
+ waitUntilAfter(table.currentSnapshot().timestampMillis());
+
+ Timestamp currentTimestamp =
Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis()));
+ List<Object[]> output =
+ sql(
+ "CALL %s.system.expire_snapshots(older_than => TIMESTAMP
'%s',table => '%s')",
+ catalogName, currentTimestamp, tableIdent);
+ Assertions.assertThat(output.get(0)[5])
+ .as("should be 1 deleted partition statistics file")
+ .isEqualTo(1L);
+
+ table.refresh();
+ Assertions.assertThat(table.partitionStatisticsFiles())
+ .as(
+ "partition statistics file entry in TableMetadata should be
present only for the snapshot %s",
+ partitionStatisticsFile2.snapshotId())
+ .extracting(PartitionStatisticsFile::snapshotId)
+ .containsExactly(partitionStatisticsFile2.snapshotId());
+
+ Assertions.assertThat(new File(partitionStatsFileLocation1))
+ .as(
+ "partition statistics file should not exist for snapshot %s",
+ partitionStatisticsFile1.snapshotId())
+ .doesNotExist();
+
+ Assertions.assertThat(new File(partitionStatsFileLocation2))
+ .as(
+ "partition statistics file should exist for snapshot %s",
+ partitionStatisticsFile2.snapshotId())
+ .exists();
+ }
+
+ private static StatisticsFile writeStatsFile(
long snapshotId, long snapshotSequenceNumber, String statsLocation,
FileIO fileIO)
throws IOException {
try (PuffinWriter puffinWriter =
Puffin.write(fileIO.newOutputFile(statsLocation)).build()) {
@@ -523,9 +564,4 @@ public class TestExpireSnapshotsProcedure extends
SparkExtensionsTestBase {
.collect(ImmutableList.toImmutableList()));
}
}
-
- private String statsFileLocation(String tableLocation) {
- String statsFileName = "stats-file-" + UUID.randomUUID();
- return tableLocation.replaceFirst("file:", "") + "/metadata/" +
statsFileName;
- }
}
diff --git
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
index b299817750..40adf30c37 100644
---
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
+++
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
@@ -41,6 +41,7 @@ import org.apache.iceberg.GenericBlobMetadata;
import org.apache.iceberg.GenericStatisticsFile;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.PartitionStatisticsFile;
import org.apache.iceberg.ReachableFileUtil;
import org.apache.iceberg.StatisticsFile;
import org.apache.iceberg.Table;
@@ -530,6 +531,75 @@ public class TestRemoveOrphanFilesProcedure extends
SparkExtensionsTestBase {
Assertions.assertThat(statsLocation.exists()).as("stats file should be
deleted").isFalse();
}
+ @Test
+ public void testRemoveOrphanFilesWithPartitionStatisticFiles() throws
Exception {
+ sql(
+ "CREATE TABLE %s USING iceberg "
+ + "TBLPROPERTIES('format-version'='2') "
+ + "AS SELECT 10 int, 'abc' data",
+ tableName);
+ Table table = Spark3Util.loadIcebergTable(spark, tableName);
+
+ String partitionStatsLocation =
ProcedureUtil.statsFileLocation(table.location());
+ PartitionStatisticsFile partitionStatisticsFile =
+ ProcedureUtil.writePartitionStatsFile(
+ table.currentSnapshot().snapshotId(), partitionStatsLocation,
table.io());
+
+ commitPartitionStatsTxn(table, partitionStatisticsFile);
+
+ // wait to ensure files are old enough
+ waitUntilAfter(System.currentTimeMillis());
+ Timestamp currentTimestamp =
Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis()));
+
+ List<Object[]> output =
+ sql(
+ "CALL %s.system.remove_orphan_files("
+ + "table => '%s',"
+ + "older_than => TIMESTAMP '%s')",
+ catalogName, tableIdent, currentTimestamp);
+ Assertions.assertThat(output).as("Should be no orphan files").isEmpty();
+
+ Assertions.assertThat(new File(partitionStatsLocation))
+ .as("partition stats file should exist")
+ .exists();
+
+ removePartitionStatsTxn(table, partitionStatisticsFile);
+
+ output =
+ sql(
+ "CALL %s.system.remove_orphan_files("
+ + "table => '%s',"
+ + "older_than => TIMESTAMP '%s')",
+ catalogName, tableIdent, currentTimestamp);
+ Assertions.assertThat(output).as("Should be orphan files").hasSize(1);
+ Assertions.assertThat(Iterables.getOnlyElement(output))
+ .as("Deleted files")
+ .containsExactly("file:" + partitionStatsLocation);
+ Assertions.assertThat(new File(partitionStatsLocation))
+ .as("partition stats file should be deleted")
+ .doesNotExist();
+ }
+
+ private static void removePartitionStatsTxn(
+ Table table, PartitionStatisticsFile partitionStatisticsFile) {
+ Transaction transaction = table.newTransaction();
+ transaction
+ .updatePartitionStatistics()
+ .removePartitionStatistics(partitionStatisticsFile.snapshotId())
+ .commit();
+ transaction.commitTransaction();
+ }
+
+ private static void commitPartitionStatsTxn(
+ Table table, PartitionStatisticsFile partitionStatisticsFile) {
+ Transaction transaction = table.newTransaction();
+ transaction
+ .updatePartitionStatistics()
+ .setPartitionStatistics(partitionStatisticsFile)
+ .commit();
+ transaction.commitTransaction();
+ }
+
@Test
public void testRemoveOrphanFilesProcedureWithPrefixMode()
throws NoSuchTableException, ParseException, IOException {
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
index 62f5167526..cff07c05d4 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
@@ -31,7 +31,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
-import java.util.function.Predicate;
import java.util.function.Supplier;
import org.apache.iceberg.AllManifestsTable;
import org.apache.iceberg.BaseTable;
@@ -44,7 +43,6 @@ import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.ReachableFileUtil;
import org.apache.iceberg.StaticTableOperations;
-import org.apache.iceberg.StatisticsFile;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.exceptions.NotFoundException;
@@ -197,14 +195,8 @@ abstract class BaseSparkAction<ThisT> {
}
protected Dataset<FileInfo> statisticsFileDS(Table table, Set<Long>
snapshotIds) {
- Predicate<StatisticsFile> predicate;
- if (snapshotIds == null) {
- predicate = statisticsFile -> true;
- } else {
- predicate = statisticsFile ->
snapshotIds.contains(statisticsFile.snapshotId());
- }
-
- List<String> statisticsFiles =
ReachableFileUtil.statisticsFilesLocations(table, predicate);
+ List<String> statisticsFiles =
+ ReachableFileUtil.statisticsFilesLocationsForSnapshots(table,
snapshotIds);
return toFileInfoDS(statisticsFiles, STATISTICS_FILES);
}
diff --git
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ProcedureUtil.java
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ProcedureUtil.java
new file mode 100644
index 0000000000..de4acd74a7
--- /dev/null
+++
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ProcedureUtil.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.extensions;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.UUID;
+import org.apache.iceberg.ImmutableGenericPartitionStatisticsFile;
+import org.apache.iceberg.PartitionStatisticsFile;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.PositionOutputStream;
+
+public class ProcedureUtil {
+
+ private ProcedureUtil() {}
+
+ static PartitionStatisticsFile writePartitionStatsFile(
+ long snapshotId, String statsLocation, FileIO fileIO) {
+ PositionOutputStream positionOutputStream;
+ try {
+ positionOutputStream = fileIO.newOutputFile(statsLocation).create();
+ positionOutputStream.close();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+
+ return ImmutableGenericPartitionStatisticsFile.builder()
+ .snapshotId(snapshotId)
+ .fileSizeInBytes(42L)
+ .path(statsLocation)
+ .build();
+ }
+
+ static String statsFileLocation(String tableLocation) {
+ String statsFileName = "stats-file-" + UUID.randomUUID();
+ return tableLocation.replaceFirst("file:", "") + "/metadata/" +
statsFileName;
+ }
+}
diff --git
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
index c4b93c6d6a..7dacce5487 100644
---
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
+++
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
@@ -28,13 +28,12 @@ import java.sql.Timestamp;
import java.time.Instant;
import java.util.List;
import java.util.Map;
-import java.util.UUID;
-import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.GenericBlobMetadata;
import org.apache.iceberg.GenericStatisticsFile;
+import org.apache.iceberg.PartitionStatisticsFile;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.StatisticsFile;
import org.apache.iceberg.Table;
@@ -445,7 +444,7 @@ public class TestExpireSnapshotsProcedure extends
SparkExtensionsTestBase {
sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg",
tableName);
sql("INSERT INTO TABLE %s VALUES (10, 'abc')", tableName);
Table table = validationCatalog.loadTable(tableIdent);
- String statsFileLocation1 = statsFileLocation(table.location());
+ String statsFileLocation1 =
ProcedureUtil.statsFileLocation(table.location());
StatisticsFile statisticsFile1 =
writeStatsFile(
table.currentSnapshot().snapshotId(),
@@ -456,7 +455,7 @@ public class TestExpireSnapshotsProcedure extends
SparkExtensionsTestBase {
sql("INSERT INTO %s SELECT 20, 'def'", tableName);
table.refresh();
- String statsFileLocation2 = statsFileLocation(table.location());
+ String statsFileLocation2 =
ProcedureUtil.statsFileLocation(table.location());
StatisticsFile statisticsFile2 =
writeStatsFile(
table.currentSnapshot().snapshotId(),
@@ -475,18 +474,9 @@ public class TestExpireSnapshotsProcedure extends
SparkExtensionsTestBase {
Assertions.assertThat(output.get(0)[5]).as("should be 1 deleted statistics
file").isEqualTo(1L);
table.refresh();
- List<StatisticsFile> statsWithSnapshotId1 =
- table.statisticsFiles().stream()
- .filter(statisticsFile -> statisticsFile.snapshotId() ==
statisticsFile1.snapshotId())
- .collect(Collectors.toList());
- Assertions.assertThat(statsWithSnapshotId1)
- .as(
- "Statistics file entry in TableMetadata should be deleted for the
snapshot %s",
- statisticsFile1.snapshotId())
- .isEmpty();
Assertions.assertThat(table.statisticsFiles())
.as(
- "Statistics file entry in TableMetadata should be present for the
snapshot %s",
+ "Statistics file entry in TableMetadata should be present only for
the snapshot %s",
statisticsFile2.snapshotId())
.extracting(StatisticsFile::snapshotId)
.containsExactly(statisticsFile2.snapshotId());
@@ -500,7 +490,58 @@ public class TestExpireSnapshotsProcedure extends
SparkExtensionsTestBase {
.exists();
}
- private StatisticsFile writeStatsFile(
+ @Test
+ public void testExpireSnapshotsWithPartitionStatisticFiles() {
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg",
tableName);
+ sql("INSERT INTO TABLE %s VALUES (10, 'abc')", tableName);
+ Table table = validationCatalog.loadTable(tableIdent);
+ String partitionStatsFileLocation1 =
ProcedureUtil.statsFileLocation(table.location());
+ PartitionStatisticsFile partitionStatisticsFile1 =
+ ProcedureUtil.writePartitionStatsFile(
+ table.currentSnapshot().snapshotId(), partitionStatsFileLocation1,
table.io());
+
table.updatePartitionStatistics().setPartitionStatistics(partitionStatisticsFile1).commit();
+
+ sql("INSERT INTO %s SELECT 20, 'def'", tableName);
+ table.refresh();
+ String partitionStatsFileLocation2 =
ProcedureUtil.statsFileLocation(table.location());
+ PartitionStatisticsFile partitionStatisticsFile2 =
+ ProcedureUtil.writePartitionStatsFile(
+ table.currentSnapshot().snapshotId(), partitionStatsFileLocation2,
table.io());
+
table.updatePartitionStatistics().setPartitionStatistics(partitionStatisticsFile2).commit();
+
+ waitUntilAfter(table.currentSnapshot().timestampMillis());
+
+ Timestamp currentTimestamp =
Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis()));
+ List<Object[]> output =
+ sql(
+ "CALL %s.system.expire_snapshots(older_than => TIMESTAMP
'%s',table => '%s')",
+ catalogName, currentTimestamp, tableIdent);
+ Assertions.assertThat(output.get(0)[5])
+ .as("should be 1 deleted partition statistics file")
+ .isEqualTo(1L);
+
+ table.refresh();
+ Assertions.assertThat(table.partitionStatisticsFiles())
+ .as(
+ "partition statistics file entry in TableMetadata should be
present only for the snapshot %s",
+ partitionStatisticsFile2.snapshotId())
+ .extracting(PartitionStatisticsFile::snapshotId)
+ .containsExactly(partitionStatisticsFile2.snapshotId());
+
+ Assertions.assertThat(new File(partitionStatsFileLocation1))
+ .as(
+ "partition statistics file should not exist for snapshot %s",
+ partitionStatisticsFile1.snapshotId())
+ .doesNotExist();
+
+ Assertions.assertThat(new File(partitionStatsFileLocation2))
+ .as(
+ "partition statistics file should exist for snapshot %s",
+ partitionStatisticsFile2.snapshotId())
+ .exists();
+ }
+
+ private static StatisticsFile writeStatsFile(
long snapshotId, long snapshotSequenceNumber, String statsLocation,
FileIO fileIO)
throws IOException {
try (PuffinWriter puffinWriter =
Puffin.write(fileIO.newOutputFile(statsLocation)).build()) {
@@ -523,9 +564,4 @@ public class TestExpireSnapshotsProcedure extends
SparkExtensionsTestBase {
.collect(ImmutableList.toImmutableList()));
}
}
-
- private String statsFileLocation(String tableLocation) {
- String statsFileName = "stats-file-" + UUID.randomUUID();
- return tableLocation.replaceFirst("file:", "") + "/metadata/" +
statsFileName;
- }
}
diff --git
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
index b299817750..40adf30c37 100644
---
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
+++
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
@@ -41,6 +41,7 @@ import org.apache.iceberg.GenericBlobMetadata;
import org.apache.iceberg.GenericStatisticsFile;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.PartitionStatisticsFile;
import org.apache.iceberg.ReachableFileUtil;
import org.apache.iceberg.StatisticsFile;
import org.apache.iceberg.Table;
@@ -530,6 +531,75 @@ public class TestRemoveOrphanFilesProcedure extends
SparkExtensionsTestBase {
Assertions.assertThat(statsLocation.exists()).as("stats file should be
deleted").isFalse();
}
+ @Test
+ public void testRemoveOrphanFilesWithPartitionStatisticFiles() throws
Exception {
+ sql(
+ "CREATE TABLE %s USING iceberg "
+ + "TBLPROPERTIES('format-version'='2') "
+ + "AS SELECT 10 int, 'abc' data",
+ tableName);
+ Table table = Spark3Util.loadIcebergTable(spark, tableName);
+
+ String partitionStatsLocation =
ProcedureUtil.statsFileLocation(table.location());
+ PartitionStatisticsFile partitionStatisticsFile =
+ ProcedureUtil.writePartitionStatsFile(
+ table.currentSnapshot().snapshotId(), partitionStatsLocation,
table.io());
+
+ commitPartitionStatsTxn(table, partitionStatisticsFile);
+
+ // wait to ensure files are old enough
+ waitUntilAfter(System.currentTimeMillis());
+ Timestamp currentTimestamp =
Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis()));
+
+ List<Object[]> output =
+ sql(
+ "CALL %s.system.remove_orphan_files("
+ + "table => '%s',"
+ + "older_than => TIMESTAMP '%s')",
+ catalogName, tableIdent, currentTimestamp);
+ Assertions.assertThat(output).as("Should be no orphan files").isEmpty();
+
+ Assertions.assertThat(new File(partitionStatsLocation))
+ .as("partition stats file should exist")
+ .exists();
+
+ removePartitionStatsTxn(table, partitionStatisticsFile);
+
+ output =
+ sql(
+ "CALL %s.system.remove_orphan_files("
+ + "table => '%s',"
+ + "older_than => TIMESTAMP '%s')",
+ catalogName, tableIdent, currentTimestamp);
+ Assertions.assertThat(output).as("Should be orphan files").hasSize(1);
+ Assertions.assertThat(Iterables.getOnlyElement(output))
+ .as("Deleted files")
+ .containsExactly("file:" + partitionStatsLocation);
+ Assertions.assertThat(new File(partitionStatsLocation))
+ .as("partition stats file should be deleted")
+ .doesNotExist();
+ }
+
+ private static void removePartitionStatsTxn(
+ Table table, PartitionStatisticsFile partitionStatisticsFile) {
+ Transaction transaction = table.newTransaction();
+ transaction
+ .updatePartitionStatistics()
+ .removePartitionStatistics(partitionStatisticsFile.snapshotId())
+ .commit();
+ transaction.commitTransaction();
+ }
+
+ private static void commitPartitionStatsTxn(
+ Table table, PartitionStatisticsFile partitionStatisticsFile) {
+ Transaction transaction = table.newTransaction();
+ transaction
+ .updatePartitionStatistics()
+ .setPartitionStatistics(partitionStatisticsFile)
+ .commit();
+ transaction.commitTransaction();
+ }
+
@Test
public void testRemoveOrphanFilesProcedureWithPrefixMode()
throws NoSuchTableException, ParseException, IOException {
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
index d0e71a707d..53ce7418f3 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
@@ -31,7 +31,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
-import java.util.function.Predicate;
import java.util.function.Supplier;
import org.apache.iceberg.AllManifestsTable;
import org.apache.iceberg.BaseTable;
@@ -44,7 +43,6 @@ import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.ReachableFileUtil;
import org.apache.iceberg.StaticTableOperations;
-import org.apache.iceberg.StatisticsFile;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.exceptions.NotFoundException;
@@ -196,14 +194,8 @@ abstract class BaseSparkAction<ThisT> {
}
protected Dataset<FileInfo> statisticsFileDS(Table table, Set<Long>
snapshotIds) {
- Predicate<StatisticsFile> predicate;
- if (snapshotIds == null) {
- predicate = statisticsFile -> true;
- } else {
- predicate = statisticsFile ->
snapshotIds.contains(statisticsFile.snapshotId());
- }
-
- List<String> statisticsFiles =
ReachableFileUtil.statisticsFilesLocations(table, predicate);
+ List<String> statisticsFiles =
+ ReachableFileUtil.statisticsFilesLocationsForSnapshots(table,
snapshotIds);
return toFileInfoDS(statisticsFiles, STATISTICS_FILES);
}