This is an automated email from the ASF dual-hosted git repository.
szehon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 760853812b Spark 3.1: Add prefix mismatch mode for deleting orphan
files (#7653)
760853812b is described below
commit 760853812b347fd53805ce53b5aed561cd4ad577
Author: Manu Zhang <[email protected]>
AuthorDate: Wed Jul 19 04:54:43 2023 +0800
Spark 3.1: Add prefix mismatch mode for deleting orphan files (#7653)
---
.../extensions/TestRemoveOrphanFilesProcedure.java | 106 +++++++++++
.../actions/BaseDeleteOrphanFilesSparkAction.java | 206 ++++++++++++++++++++-
.../iceberg/spark/actions/SetAccumulator.java | 61 ++++++
.../procedures/RemoveOrphanFilesProcedure.java | 44 ++++-
.../spark/actions/TestRemoveOrphanFilesAction.java | 120 ++++++++++++
5 files changed, 530 insertions(+), 7 deletions(-)
diff --git
a/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
index 7d5b5f88a1..197e80fbd1 100644
---
a/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
+++
b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
@@ -32,10 +32,16 @@ import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
import org.apache.iceberg.Files;
import org.apache.iceberg.GenericBlobMetadata;
import org.apache.iceberg.GenericStatisticsFile;
+import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.StatisticsFile;
import org.apache.iceberg.Table;
import org.apache.iceberg.Transaction;
@@ -49,6 +55,8 @@ import
org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.parser.ParseException;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
@@ -462,4 +470,102 @@ public class TestRemoveOrphanFilesProcedure extends
SparkExtensionsTestBase {
file.isDirectory(), "Table location '%s' does not point to a
directory", location);
return file;
}
+
+ @Test
+ public void testRemoveOrphanFilesProcedureWithPrefixMode()
+ throws NoSuchTableException, ParseException, IOException {
+ if (catalogName.equals("testhadoop")) {
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg",
tableName);
+ } else {
+ sql(
+ "CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg
LOCATION '%s'",
+ tableName, temp.newFolder().toURI().toString());
+ }
+ Table table = Spark3Util.loadIcebergTable(spark, tableName);
+ String location = table.location();
+ Path originalPath = new Path(location);
+ FileSystem localFs = FileSystem.getLocal(new Configuration());
+ Path originalDataPath = new Path(originalPath, "data");
+ localFs.mkdirs(originalDataPath);
+ localFs.create(new Path(originalDataPath, "data-a.parquet")).close();
+ localFs.create(new Path(originalDataPath, "data-b.parquet")).close();
+
+ URI uri = originalPath.toUri();
+ Path newParentPath = new Path("file1", uri.getAuthority(), uri.getPath());
+
+ DataFile dataFile1 =
+ DataFiles.builder(PartitionSpec.unpartitioned())
+ .withPath(new Path(newParentPath,
"data/data-a.parquet").toString())
+ .withFileSizeInBytes(10)
+ .withRecordCount(1)
+ .build();
+ DataFile dataFile2 =
+ DataFiles.builder(PartitionSpec.unpartitioned())
+ .withPath(new Path(newParentPath,
"data/data-b.parquet").toString())
+ .withFileSizeInBytes(10)
+ .withRecordCount(1)
+ .build();
+
+ table.newFastAppend().appendFile(dataFile1).appendFile(dataFile2).commit();
+
+ Timestamp currentTimestamp =
Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis()));
+
+ Assert.assertEquals(
+ 0,
+ sql(
+ "CALL %s.system.remove_orphan_files("
+ + "table => '%s',"
+ + "older_than => TIMESTAMP '%s',"
+ + "equal_schemes => map('file1', 'file'))",
+ catalogName, tableIdent, currentTimestamp)
+ .size());
+
+ // Test with no equal schemes
+ AssertHelpers.assertThrows(
+ "Should complain about removing orphan files",
+ ValidationException.class,
+ "Conflicting authorities/schemes: [(file1, file)]",
+ () ->
+ sql(
+ "CALL %s.system.remove_orphan_files("
+ + "table => '%s',"
+ + "older_than => TIMESTAMP '%s')",
+ catalogName, tableIdent, currentTimestamp));
+
+ AssertHelpers.assertThrows(
+ "Should complain about removing orphan files",
+ ValidationException.class,
+ "Conflicting authorities/schemes: [(file1, file)]",
+ () ->
+ sql(
+ "CALL %s.system.remove_orphan_files("
+ + "table => '%s',"
+ + "older_than => TIMESTAMP '%s',"
+ + "prefix_mismatch_mode => 'error')",
+ catalogName, tableIdent, currentTimestamp));
+
+ Assert.assertEquals(
+ 2,
+ sql(
+ "CALL %s.system.remove_orphan_files("
+ + "table => '%s',"
+ + "older_than => TIMESTAMP '%s',"
+ + "prefix_mismatch_mode => 'delete')",
+ catalogName, tableIdent, currentTimestamp)
+ .size());
+
+ Assert.assertEquals(
+ 0,
+ sql(
+ "CALL %s.system.remove_orphan_files("
+ + "table => '%s',"
+ + "older_than => TIMESTAMP '%s',"
+ + "prefix_mismatch_mode => 'ignore')",
+ catalogName, tableIdent, currentTimestamp)
+ .size());
+
+ // Drop table in afterEach has purge and fails due to invalid scheme
"file1" used in this test
+ // Dropping the table here
+ sql("DROP TABLE %s", tableName);
+ }
}
diff --git
a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java
b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java
index 72b6268026..b23de35afb 100644
---
a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java
+++
b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java
@@ -23,8 +23,13 @@ import static
org.apache.iceberg.TableProperties.GC_ENABLED_DEFAULT;
import java.io.File;
import java.io.IOException;
+import java.io.Serializable;
+import java.net.URI;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
@@ -39,13 +44,21 @@ import org.apache.iceberg.actions.DeleteOrphanFiles;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.hadoop.HiddenPathFilter;
+import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.base.Splitter;
+import org.apache.iceberg.relocated.com.google.common.base.Strings;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterators;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.JobGroupInfo;
+import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.MapPartitionsFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
@@ -58,6 +71,7 @@ import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.util.SerializableConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.Tuple2;
/**
* An action that removes orphan metadata, data and delete files by listing a
given location and
@@ -79,6 +93,8 @@ public class BaseDeleteOrphanFilesSparkAction
implements DeleteOrphanFiles {
private static final Logger LOG =
LoggerFactory.getLogger(BaseDeleteOrphanFilesSparkAction.class);
+ private static final Splitter COMMA = Splitter.on(",");
+ private static final Map<String, String> EQUAL_SCHEMES_DEFAULT =
ImmutableMap.of("s3n,s3a", "s3");
private static final UserDefinedFunction filenameUDF =
functions.udf(
(String path) -> {
@@ -97,6 +113,9 @@ public class BaseDeleteOrphanFilesSparkAction
private final int partitionDiscoveryParallelism;
private final Table table;
+ private Map<String, String> equalSchemes = flattenMap(EQUAL_SCHEMES_DEFAULT);
+ private Map<String, String> equalAuthorities = Collections.emptyMap();
+ private PrefixMismatchMode prefixMismatchMode = PrefixMismatchMode.ERROR;
private String location = null;
private long olderThanTimestamp = System.currentTimeMillis() -
TimeUnit.DAYS.toMillis(3);
private Consumer<String> deleteFunc =
@@ -134,6 +153,29 @@ public class BaseDeleteOrphanFilesSparkAction
return this;
}
+ @Override
+ public BaseDeleteOrphanFilesSparkAction prefixMismatchMode(
+ PrefixMismatchMode newPrefixMismatchMode) {
+ this.prefixMismatchMode = newPrefixMismatchMode;
+ return this;
+ }
+
+ @Override
+ public BaseDeleteOrphanFilesSparkAction equalSchemes(Map<String, String>
newEqualSchemes) {
+ this.equalSchemes = Maps.newHashMap();
+ equalSchemes.putAll(flattenMap(EQUAL_SCHEMES_DEFAULT));
+ equalSchemes.putAll(flattenMap(newEqualSchemes));
+ return this;
+ }
+
+ @Override
+ public BaseDeleteOrphanFilesSparkAction equalAuthorities(
+ Map<String, String> newEqualAuthorities) {
+ this.equalAuthorities = Maps.newHashMap();
+ equalAuthorities.putAll(flattenMap(newEqualAuthorities));
+ return this;
+ }
+
@Override
public BaseDeleteOrphanFilesSparkAction location(String newLocation) {
this.location = newLocation;
@@ -174,13 +216,9 @@ public class BaseDeleteOrphanFilesSparkAction
Dataset<Row> validFileDF = validDataFileDF.union(validMetadataFileDF);
Dataset<Row> actualFileDF = buildActualFileDF();
- Column actualFileName = filenameUDF.apply(actualFileDF.col("file_path"));
- Column validFileName = filenameUDF.apply(validFileDF.col("file_path"));
- Column nameEqual = actualFileName.equalTo(validFileName);
- Column actualContains =
actualFileDF.col("file_path").contains(validFileDF.col("file_path"));
- Column joinCond = nameEqual.and(actualContains);
List<String> orphanFiles =
- actualFileDF.join(validFileDF, joinCond,
"leftanti").as(Encoders.STRING()).collectAsList();
+ findOrphanFiles(
+ spark(), actualFileDF, validFileDF, equalSchemes,
equalAuthorities, prefixMismatchMode);
Tasks.foreach(orphanFiles)
.noRetry()
@@ -296,4 +334,160 @@ public class BaseDeleteOrphanFilesSparkAction
return files.iterator();
};
}
+
+ @VisibleForTesting
+ static List<String> findOrphanFiles(
+ SparkSession spark,
+ Dataset<Row> actualFileDF,
+ Dataset<Row> validFileDF,
+ Map<String, String> equalSchemes,
+ Map<String, String> equalAuthorities,
+ PrefixMismatchMode prefixMismatchMode) {
+ Dataset<FileMetadata> actualFileMetadataDS =
+ actualFileDF.mapPartitions(
+ toFileMetadata(equalSchemes, equalAuthorities),
Encoders.bean(FileMetadata.class));
+ Dataset<FileMetadata> validFileMetadataDS =
+ validFileDF.mapPartitions(
+ toFileMetadata(equalSchemes, equalAuthorities),
Encoders.bean(FileMetadata.class));
+
+ SetAccumulator<Pair<String, String>> conflicts = new SetAccumulator<>();
+ spark.sparkContext().register(conflicts);
+
+ Column joinCond =
actualFileMetadataDS.col("path").equalTo(validFileMetadataDS.col("path"));
+
+ List<String> orphanFiles =
+ actualFileMetadataDS
+ .joinWith(validFileMetadataDS, joinCond, "leftouter")
+ .mapPartitions(findOrphanFiles(prefixMismatchMode, conflicts),
Encoders.STRING())
+ .collectAsList();
+
+ if (prefixMismatchMode == PrefixMismatchMode.ERROR &&
!conflicts.value().isEmpty()) {
+ throw new ValidationException(
+ "Unable to determine whether certain files are orphan. "
+ + "Metadata references files that match listed/provided files
except for authority/scheme. "
+ + "Please, inspect the conflicting authorities/schemes and
provide which of them are equal "
+ + "by further configuring the action via equalSchemes() and
equalAuthorities() methods. "
+ + "Set the prefix mismatch mode to 'NONE' to ignore remaining
locations with conflicting "
+ + "authorities/schemes or to 'DELETE' iff you are ABSOLUTELY
confident that remaining conflicting "
+ + "authorities/schemes are different. It will be impossible to
recover deleted files. "
+ + "Conflicting authorities/schemes: %s.",
+ conflicts.value());
+ }
+
+ return orphanFiles;
+ }
+
+ private static Map<String, String> flattenMap(Map<String, String> map) {
+ Map<String, String> flattenedMap = Maps.newHashMap();
+ if (map != null) {
+ for (String key : map.keySet()) {
+ String value = map.get(key);
+ for (String splitKey : COMMA.split(key)) {
+ flattenedMap.put(splitKey.trim(), value.trim());
+ }
+ }
+ }
+ return flattenedMap;
+ }
+
+ private static MapPartitionsFunction<Tuple2<FileMetadata, FileMetadata>,
String> findOrphanFiles(
+ PrefixMismatchMode mode, SetAccumulator<Pair<String, String>> conflicts)
{
+ return rows -> {
+ Iterator<String> transformed =
+ Iterators.transform(
+ rows,
+ row -> {
+ FileMetadata actual = row._1;
+ FileMetadata valid = row._2;
+
+ if (valid == null) {
+ return actual.location;
+ }
+
+ boolean schemeMatch =
+ Strings.isNullOrEmpty(valid.scheme)
+ || valid.scheme.equalsIgnoreCase(actual.scheme);
+ boolean authorityMatch =
+ Strings.isNullOrEmpty(valid.authority)
+ || valid.authority.equalsIgnoreCase(actual.authority);
+
+ if ((!schemeMatch || !authorityMatch) && mode ==
PrefixMismatchMode.DELETE) {
+ return actual.location;
+ } else {
+ if (!schemeMatch) {
+ conflicts.add(Pair.of(valid.scheme, actual.scheme));
+ }
+ if (!authorityMatch) {
+ conflicts.add(Pair.of(valid.authority, actual.authority));
+ }
+ }
+
+ return null;
+ });
+ return Iterators.filter(transformed, Objects::nonNull);
+ };
+ }
+
+ private static MapPartitionsFunction<Row, FileMetadata> toFileMetadata(
+ Map<String, String> equalSchemesMap, Map<String, String>
equalAuthoritiesMap) {
+ return rows ->
+ Iterators.transform(
+ rows,
+ row -> {
+ String location = row.getString(0);
+ URI uri = new Path(location).toUri();
+ String scheme = equalSchemesMap.getOrDefault(uri.getScheme(),
uri.getScheme());
+ String authority =
+ equalAuthoritiesMap.getOrDefault(uri.getAuthority(),
uri.getAuthority());
+ return new FileMetadata(scheme, authority, uri.getPath(),
location);
+ });
+ }
+
+ public static class FileMetadata implements Serializable {
+ private String scheme;
+ private String authority;
+ private String path;
+ private String location;
+
+ public FileMetadata(String scheme, String authority, String path, String
location) {
+ this.scheme = scheme;
+ this.authority = authority;
+ this.path = path;
+ this.location = location;
+ }
+
+ public FileMetadata() {}
+
+ public void setScheme(String scheme) {
+ this.scheme = scheme;
+ }
+
+ public void setAuthority(String authority) {
+ this.authority = authority;
+ }
+
+ public void setPath(String path) {
+ this.path = path;
+ }
+
+ public void setLocation(String location) {
+ this.location = location;
+ }
+
+ public String getScheme() {
+ return scheme;
+ }
+
+ public String getAuthority() {
+ return authority;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public String getLocation() {
+ return location;
+ }
+ }
}
diff --git
a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/SetAccumulator.java
b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/SetAccumulator.java
new file mode 100644
index 0000000000..745169fc1e
--- /dev/null
+++
b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/SetAccumulator.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import java.util.Collections;
+import java.util.Set;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.spark.util.AccumulatorV2;
+
+public class SetAccumulator<T> extends AccumulatorV2<T, java.util.Set<T>> {
+
+ private final Set<T> set = Collections.synchronizedSet(Sets.newHashSet());
+
+ @Override
+ public boolean isZero() {
+ return set.isEmpty();
+ }
+
+ @Override
+ public AccumulatorV2<T, Set<T>> copy() {
+ SetAccumulator<T> newAccumulator = new SetAccumulator<>();
+ newAccumulator.set.addAll(set);
+ return newAccumulator;
+ }
+
+ @Override
+ public void reset() {
+ set.clear();
+ }
+
+ @Override
+ public void add(T v) {
+ set.add(v);
+ }
+
+ @Override
+ public void merge(AccumulatorV2<T, Set<T>> other) {
+ set.addAll(other.value());
+ }
+
+ @Override
+ public Set<T> value() {
+ return set;
+ }
+}
diff --git
a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
index b41ada1754..f91d593071 100644
---
a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
+++
b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
@@ -18,14 +18,17 @@
*/
package org.apache.iceberg.spark.procedures;
+import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.iceberg.Table;
import org.apache.iceberg.actions.DeleteOrphanFiles;
+import org.apache.iceberg.actions.DeleteOrphanFiles.PrefixMismatchMode;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import
org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
import
org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.iceberg.spark.actions.SparkActions;
@@ -40,6 +43,7 @@ import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.unsafe.types.UTF8String;
+import scala.runtime.BoxedUnit;
/**
* A procedure that removes orphan files in a table.
@@ -54,7 +58,10 @@ public class RemoveOrphanFilesProcedure extends
BaseProcedure {
ProcedureParameter.optional("older_than", DataTypes.TimestampType),
ProcedureParameter.optional("location", DataTypes.StringType),
ProcedureParameter.optional("dry_run", DataTypes.BooleanType),
- ProcedureParameter.optional("max_concurrent_deletes",
DataTypes.IntegerType)
+ ProcedureParameter.optional("max_concurrent_deletes",
DataTypes.IntegerType),
+ ProcedureParameter.optional("equal_schemes", STRING_MAP),
+ ProcedureParameter.optional("equal_authorities", STRING_MAP),
+ ProcedureParameter.optional("prefix_mismatch_mode",
DataTypes.StringType),
};
private static final StructType OUTPUT_TYPE =
@@ -87,6 +94,7 @@ public class RemoveOrphanFilesProcedure extends BaseProcedure
{
}
@Override
+ @SuppressWarnings("checkstyle:CyclomaticComplexity")
public InternalRow[] call(InternalRow args) {
Identifier tableIdent = toIdentifier(args.getString(0),
PARAMETERS[0].name());
Long olderThanMillis = args.isNullAt(1) ? null :
DateTimeUtil.microsToMillis(args.getLong(1));
@@ -99,6 +107,33 @@ public class RemoveOrphanFilesProcedure extends
BaseProcedure {
"max_concurrent_deletes should have value > 0, value: %s",
maxConcurrentDeletes);
+ Map<String, String> equalSchemes = Maps.newHashMap();
+ if (!args.isNullAt(5)) {
+ args.getMap(5)
+ .foreach(
+ DataTypes.StringType,
+ DataTypes.StringType,
+ (k, v) -> {
+ equalSchemes.put(k.toString(), v.toString());
+ return BoxedUnit.UNIT;
+ });
+ }
+
+ Map<String, String> equalAuthorities = Maps.newHashMap();
+ if (!args.isNullAt(6)) {
+ args.getMap(6)
+ .foreach(
+ DataTypes.StringType,
+ DataTypes.StringType,
+ (k, v) -> {
+ equalSchemes.put(k.toString(), v.toString());
+ return BoxedUnit.UNIT;
+ });
+ }
+
+ PrefixMismatchMode prefixMismatchMode =
+ args.isNullAt(7) ? null :
PrefixMismatchMode.fromString(args.getString(7));
+
return withIcebergTable(
tableIdent,
table -> {
@@ -124,6 +159,13 @@ public class RemoveOrphanFilesProcedure extends
BaseProcedure {
action.executeDeleteWith(removeService(maxConcurrentDeletes));
}
+ action.equalSchemes(equalSchemes);
+ action.equalAuthorities(equalAuthorities);
+
+ if (prefixMismatchMode != null) {
+ action.prefixMismatchMode(prefixMismatchMode);
+ }
+
DeleteOrphanFiles.Result result = action.execute();
return toOutputRows(result);
diff --git
a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
index 8877fea51c..19456643b8 100644
---
a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
+++
b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
@@ -734,4 +734,124 @@ public abstract class TestRemoveOrphanFilesAction extends
SparkTestBase {
.isEqualTo(statsLocation.toURI().toString());
Assertions.assertThat(statsLocation.exists()).as("stats file should be
deleted").isFalse();
}
+
+ @Test
+ public void testPathsWithExtraSlashes() {
+ List<String> validFiles = Lists.newArrayList("file:///dir1/dir2/file1");
+ List<String> actualFiles =
Lists.newArrayList("file:///dir1/////dir2///file1");
+ executeTest(validFiles, actualFiles, Lists.newArrayList());
+ }
+
+ @Test
+ public void testPathsWithValidFileHavingNoAuthority() {
+ List<String> validFiles = Lists.newArrayList("hdfs:///dir1/dir2/file1");
+ List<String> actualFiles =
Lists.newArrayList("hdfs://servicename/dir1/dir2/file1");
+ executeTest(validFiles, actualFiles, Lists.newArrayList());
+ }
+
+ @Test
+ public void testPathsWithActualFileHavingNoAuthority() {
+ List<String> validFiles =
Lists.newArrayList("hdfs://servicename/dir1/dir2/file1");
+ List<String> actualFiles = Lists.newArrayList("hdfs:///dir1/dir2/file1");
+ executeTest(validFiles, actualFiles, Lists.newArrayList());
+ }
+
+ @Test
+ public void testPathsWithEqualSchemes() {
+ List<String> validFiles =
Lists.newArrayList("scheme1://bucket1/dir1/dir2/file1");
+ List<String> actualFiles =
Lists.newArrayList("scheme2://bucket1/dir1/dir2/file1");
+ AssertHelpers.assertThrows(
+ "Test remove orphan files with equal schemes",
+ ValidationException.class,
+ "Conflicting authorities/schemes: [(scheme1, scheme2)]",
+ () ->
+ executeTest(
+ validFiles,
+ actualFiles,
+ Lists.newArrayList(),
+ ImmutableMap.of(),
+ ImmutableMap.of(),
+ DeleteOrphanFiles.PrefixMismatchMode.ERROR));
+
+ Map<String, String> equalSchemes = Maps.newHashMap();
+ equalSchemes.put("scheme1", "scheme");
+ equalSchemes.put("scheme2", "scheme");
+ executeTest(
+ validFiles,
+ actualFiles,
+ Lists.newArrayList(),
+ equalSchemes,
+ ImmutableMap.of(),
+ DeleteOrphanFiles.PrefixMismatchMode.ERROR);
+ }
+
+ @Test
+ public void testPathsWithEqualAuthorities() {
+ List<String> validFiles =
Lists.newArrayList("hdfs://servicename1/dir1/dir2/file1");
+ List<String> actualFiles =
Lists.newArrayList("hdfs://servicename2/dir1/dir2/file1");
+ AssertHelpers.assertThrows(
+ "Test remove orphan files with equal authorities",
+ ValidationException.class,
+ "Conflicting authorities/schemes: [(servicename1, servicename2)]",
+ () ->
+ executeTest(
+ validFiles,
+ actualFiles,
+ Lists.newArrayList(),
+ ImmutableMap.of(),
+ ImmutableMap.of(),
+ DeleteOrphanFiles.PrefixMismatchMode.ERROR));
+
+ Map<String, String> equalAuthorities = Maps.newHashMap();
+ equalAuthorities.put("servicename1", "servicename");
+ equalAuthorities.put("servicename2", "servicename");
+ executeTest(
+ validFiles,
+ actualFiles,
+ Lists.newArrayList(),
+ ImmutableMap.of(),
+ equalAuthorities,
+ DeleteOrphanFiles.PrefixMismatchMode.ERROR);
+ }
+
+ @Test
+ public void testRemoveOrphanFileActionWithDeleteMode() {
+ List<String> validFiles =
Lists.newArrayList("hdfs://servicename1/dir1/dir2/file1");
+ List<String> actualFiles =
Lists.newArrayList("hdfs://servicename2/dir1/dir2/file1");
+
+ executeTest(
+ validFiles,
+ actualFiles,
+ Lists.newArrayList("hdfs://servicename2/dir1/dir2/file1"),
+ ImmutableMap.of(),
+ ImmutableMap.of(),
+ DeleteOrphanFiles.PrefixMismatchMode.DELETE);
+ }
+
+ private void executeTest(
+ List<String> validFiles, List<String> actualFiles, List<String>
expectedOrphanFiles) {
+ executeTest(
+ validFiles,
+ actualFiles,
+ expectedOrphanFiles,
+ ImmutableMap.of(),
+ ImmutableMap.of(),
+ DeleteOrphanFiles.PrefixMismatchMode.IGNORE);
+ }
+
+ private void executeTest(
+ List<String> validFiles,
+ List<String> actualFiles,
+ List<String> expectedOrphanFiles,
+ Map<String, String> equalSchemes,
+ Map<String, String> equalAuthorities,
+ DeleteOrphanFiles.PrefixMismatchMode mode) {
+ Dataset<Row> validFilesDF = spark.createDataset(validFiles,
Encoders.STRING()).toDF();
+ Dataset<Row> actualFilesDF = spark.createDataset(actualFiles,
Encoders.STRING()).toDF();
+
+ List<String> orphanFiles =
+ BaseDeleteOrphanFilesSparkAction.findOrphanFiles(
+ spark, actualFilesDF, validFilesDF, equalSchemes,
equalAuthorities, mode);
+ Assert.assertEquals(expectedOrphanFiles, orphanFiles);
+ }
}