This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new b67cec582 Spark 3.3: Add prefix mismatch mode for deleting orphan 
files (#5385)
b67cec582 is described below

commit b67cec582513ac9887b832ae0f33a4a209a453ce
Author: Karuppayya <[email protected]>
AuthorDate: Fri Jul 29 09:26:29 2022 -0700

    Spark 3.3: Add prefix mismatch mode for deleting orphan files (#5385)
---
 .../extensions/TestRemoveOrphanFilesProcedure.java |  93 +++++++++
 .../actions/DeleteOrphanFilesSparkAction.java      | 215 +++++++++++++++++++--
 .../iceberg/spark/actions/SetAccumulator.java      |  61 ++++++
 .../procedures/RemoveOrphanFilesProcedure.java     |  43 ++++-
 .../spark/actions/TestRemoveOrphanFilesAction.java | 121 ++++++++++++
 5 files changed, 512 insertions(+), 21 deletions(-)

diff --git 
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
 
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
index d9ccda555..7a2d5c56e 100644
--- 
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
+++ 
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
@@ -22,6 +22,7 @@ import static org.apache.iceberg.TableProperties.GC_ENABLED;
 import static org.apache.iceberg.TableProperties.WRITE_AUDIT_PUBLISH_ENABLED;
 
 import java.io.IOException;
+import java.net.URI;
 import java.sql.Timestamp;
 import java.time.Instant;
 import java.util.List;
@@ -30,18 +31,26 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.ReachableFileUtil;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.spark.Spark3Util;
 import org.apache.iceberg.spark.data.TestHelpers;
+import org.apache.iceberg.spark.source.FilePathLastModifiedRecord;
 import org.apache.iceberg.spark.source.SimpleRecord;
 import org.apache.spark.sql.AnalysisException;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.parser.ParseException;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Rule;
@@ -437,4 +446,88 @@ public class TestRemoveOrphanFilesProcedure extends 
SparkExtensionsTestBase {
         resultDF.as(Encoders.bean(SimpleRecord.class)).collectAsList();
     Assert.assertEquals("Rows must match", records, actualRecords);
   }
+
+  @Test
+  public void testRemoveOrphanFilesProcedureWithPrefixMode()
+      throws NoSuchTableException, ParseException, IOException {
+    if (catalogName.equals("testhadoop")) {
+      sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", 
tableName);
+    } else {
+      sql(
+          "CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg 
LOCATION '%s'",
+          tableName, temp.newFolder().toURI().toString());
+    }
+    Table table = Spark3Util.loadIcebergTable(spark, tableName);
+    String location = table.location();
+    Path originalPath = new Path(location);
+
+    URI uri = originalPath.toUri();
+    Path newParentPath = new Path("file1", uri.getAuthority(), uri.getPath());
+
+    DataFile dataFile1 =
+        DataFiles.builder(PartitionSpec.unpartitioned())
+            .withPath(new Path(newParentPath, 
"path/to/data-a.parquet").toString())
+            .withFileSizeInBytes(10)
+            .withRecordCount(1)
+            .build();
+    DataFile dataFile2 =
+        DataFiles.builder(PartitionSpec.unpartitioned())
+            .withPath(new Path(newParentPath, 
"path/to/data-b.parquet").toString())
+            .withFileSizeInBytes(10)
+            .withRecordCount(1)
+            .build();
+
+    table.newFastAppend().appendFile(dataFile1).appendFile(dataFile2).commit();
+
+    Timestamp lastModifiedTimestamp = new Timestamp(10000);
+
+    List<FilePathLastModifiedRecord> allFiles =
+        Lists.newArrayList(
+            new FilePathLastModifiedRecord(
+                new Path(originalPath, "path/to/data-a.parquet").toString(), 
lastModifiedTimestamp),
+            new FilePathLastModifiedRecord(
+                new Path(originalPath, "path/to/data-b.parquet").toString(), 
lastModifiedTimestamp),
+            new FilePathLastModifiedRecord(
+                ReachableFileUtil.versionHintLocation(table), 
lastModifiedTimestamp));
+
+    for (String file : ReachableFileUtil.metadataFileLocations(table, true)) {
+      allFiles.add(new FilePathLastModifiedRecord(file, 
lastModifiedTimestamp));
+    }
+
+    for (ManifestFile manifest : TestHelpers.dataManifests(table)) {
+      allFiles.add(new FilePathLastModifiedRecord(manifest.path(), 
lastModifiedTimestamp));
+    }
+
+    Dataset<Row> compareToFileList =
+        spark
+            .createDataFrame(allFiles, FilePathLastModifiedRecord.class)
+            .withColumnRenamed("filePath", "file_path")
+            .withColumnRenamed("lastModified", "last_modified");
+    String fileListViewName = "files_view";
+    compareToFileList.createOrReplaceTempView(fileListViewName);
+    List<Object[]> orphanFiles =
+        sql(
+            "CALL %s.system.remove_orphan_files("
+                + "table => '%s',"
+                + "equal_schemes => map('file1', 'file'),"
+                + "file_list_view => '%s')",
+            catalogName, tableIdent, fileListViewName);
+    Assert.assertEquals(0, orphanFiles.size());
+
+    // Test with no equal schemes
+    AssertHelpers.assertThrows(
+        "Should complain about removing orphan files",
+        ValidationException.class,
+        "Conflicting authorities/schemes: [(file1, file)]",
+        () ->
+            sql(
+                "CALL %s.system.remove_orphan_files("
+                    + "table => '%s',"
+                    + "file_list_view => '%s')",
+                catalogName, tableIdent, fileListViewName));
+
+    // Drop table in afterEach has purge and fails due to invalid scheme 
"file1" used in this test
+    // Dropping the table here
+    sql("DROP TABLE %s", tableName);
+  }
 }
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java
index dae08cfcb..7df3eaf94 100644
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java
@@ -21,13 +21,15 @@ package org.apache.iceberg.spark.actions;
 import static org.apache.iceberg.TableProperties.GC_ENABLED;
 import static org.apache.iceberg.TableProperties.GC_ENABLED_DEFAULT;
 
-import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
+import java.net.URI;
 import java.sql.Timestamp;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -49,26 +51,32 @@ import org.apache.iceberg.hadoop.HiddenPathFilter;
 import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 import org.apache.iceberg.relocated.com.google.common.base.Joiner;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.base.Splitter;
+import org.apache.iceberg.relocated.com.google.common.base.Strings;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterators;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.spark.JobGroupInfo;
+import org.apache.iceberg.util.Pair;
 import org.apache.iceberg.util.PropertyUtil;
 import org.apache.iceberg.util.Tasks;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.MapPartitionsFunction;
 import org.apache.spark.broadcast.Broadcast;
 import org.apache.spark.sql.Column;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.expressions.UserDefinedFunction;
-import org.apache.spark.sql.functions;
 import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.util.SerializableConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.Tuple2;
 
 /**
  * An action that removes orphan metadata, data and delete files by listing a 
given location and
@@ -95,17 +103,8 @@ public class DeleteOrphanFilesSparkAction extends 
BaseSparkAction<DeleteOrphanFi
     implements DeleteOrphanFiles {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(DeleteOrphanFilesSparkAction.class);
-  private static final UserDefinedFunction filenameUDF =
-      functions.udf(
-          (String path) -> {
-            int lastIndex = path.lastIndexOf(File.separator);
-            if (lastIndex == -1) {
-              return path;
-            } else {
-              return path.substring(lastIndex + 1);
-            }
-          },
-          DataTypes.StringType);
+  private static final Splitter COMMA = Splitter.on(",");
+  private static final Map<String, String> EQUAL_SCHEMES_DEFAULT = 
ImmutableMap.of("s3n,s3a", "s3");
 
   private final SerializableConfiguration hadoopConf;
   private final int partitionDiscoveryParallelism;
@@ -118,6 +117,9 @@ public class DeleteOrphanFilesSparkAction extends 
BaseSparkAction<DeleteOrphanFi
         }
       };
 
+  private Map<String, String> equalSchemes = flattenMap(EQUAL_SCHEMES_DEFAULT);
+  private Map<String, String> equalAuthorities = Collections.emptyMap();
+  private PrefixMismatchMode prefixMismatchMode = PrefixMismatchMode.ERROR;
   private String location = null;
   private long olderThanTimestamp = System.currentTimeMillis() - 
TimeUnit.DAYS.toMillis(3);
   private Dataset<Row> compareToFileList;
@@ -149,6 +151,27 @@ public class DeleteOrphanFilesSparkAction extends 
BaseSparkAction<DeleteOrphanFi
     return this;
   }
 
+  @Override
+  public DeleteOrphanFilesSparkAction prefixMismatchMode(PrefixMismatchMode 
newPrefixMismatchMode) {
+    this.prefixMismatchMode = newPrefixMismatchMode;
+    return this;
+  }
+
+  @Override
+  public DeleteOrphanFilesSparkAction equalSchemes(Map<String, String> 
newEqualSchemes) {
+    this.equalSchemes = Maps.newHashMap();
+    equalSchemes.putAll(flattenMap(EQUAL_SCHEMES_DEFAULT));
+    equalSchemes.putAll(flattenMap(newEqualSchemes));
+    return this;
+  }
+
+  @Override
+  public DeleteOrphanFilesSparkAction equalAuthorities(Map<String, String> 
newEqualAuthorities) {
+    this.equalAuthorities = Maps.newHashMap();
+    equalAuthorities.putAll(flattenMap(newEqualAuthorities));
+    return this;
+  }
+
   @Override
   public DeleteOrphanFilesSparkAction location(String newLocation) {
     this.location = newLocation;
@@ -221,13 +244,9 @@ public class DeleteOrphanFilesSparkAction extends 
BaseSparkAction<DeleteOrphanFi
     Dataset<Row> actualFileDF =
         compareToFileList == null ? buildActualFileDF() : 
filteredCompareToFileList();
 
-    Column actualFileName = filenameUDF.apply(actualFileDF.col(FILE_PATH));
-    Column validFileName = filenameUDF.apply(validFileDF.col(FILE_PATH));
-    Column nameEqual = actualFileName.equalTo(validFileName);
-    Column actualContains = 
actualFileDF.col(FILE_PATH).contains(validFileDF.col(FILE_PATH));
-    Column joinCond = nameEqual.and(actualContains);
     List<String> orphanFiles =
-        actualFileDF.join(validFileDF, joinCond, 
"leftanti").as(Encoders.STRING()).collectAsList();
+        findOrphanFiles(
+            spark(), actualFileDF, validFileDF, equalSchemes, 
equalAuthorities, prefixMismatchMode);
 
     Tasks.foreach(orphanFiles)
         .noRetry()
@@ -353,6 +372,114 @@ public class DeleteOrphanFilesSparkAction extends 
BaseSparkAction<DeleteOrphanFi
     };
   }
 
+  @VisibleForTesting
+  static List<String> findOrphanFiles(
+      SparkSession spark,
+      Dataset<Row> actualFileDF,
+      Dataset<Row> validFileDF,
+      Map<String, String> equalSchemes,
+      Map<String, String> equalAuthorities,
+      PrefixMismatchMode prefixMismatchMode) {
+    Dataset<FileMetadata> actualFileMetadataDS =
+        actualFileDF.mapPartitions(
+            toFileMetadata(equalSchemes, equalAuthorities), 
Encoders.bean(FileMetadata.class));
+    Dataset<FileMetadata> validFileMetadataDS =
+        validFileDF.mapPartitions(
+            toFileMetadata(equalSchemes, equalAuthorities), 
Encoders.bean(FileMetadata.class));
+
+    SetAccumulator<Pair<String, String>> conflicts = new SetAccumulator<>();
+    spark.sparkContext().register(conflicts);
+
+    Column joinCond = 
actualFileMetadataDS.col("path").equalTo(validFileMetadataDS.col("path"));
+
+    List<String> orphanFiles =
+        actualFileMetadataDS
+            .joinWith(validFileMetadataDS, joinCond, "leftouter")
+            .mapPartitions(findOrphanFiles(prefixMismatchMode, conflicts), 
Encoders.STRING())
+            .collectAsList();
+
+    if (prefixMismatchMode == PrefixMismatchMode.ERROR && 
!conflicts.value().isEmpty()) {
+      throw new ValidationException(
+          "Unable to determine whether certain files are orphan. "
+              + "Metadata references files that match listed/provided files 
except for authority/scheme. "
+              + "Please, inspect the conflicting authorities/schemes and 
provide which of them are equal "
+              + "by further configuring the action via equalSchemes() and 
equalAuthorities() methods. "
+              + "Set the prefix mismatch mode to 'NONE' to ignore remaining 
locations with conflicting "
+              + "authorities/schemes or to 'DELETE' iff you are ABSOLUTELY 
confident that remaining conflicting "
+              + "authorities/schemes are different. It will be impossible to 
recover deleted files. "
+              + "Conflicting authorities/schemes: %s.",
+          conflicts.value());
+    }
+
+    return orphanFiles;
+  }
+
+  private static Map<String, String> flattenMap(Map<String, String> map) {
+    Map<String, String> flattenedMap = Maps.newHashMap();
+    if (map != null) {
+      for (String key : map.keySet()) {
+        String value = map.get(key);
+        for (String splitKey : COMMA.split(key)) {
+          flattenedMap.put(splitKey.trim(), value.trim());
+        }
+      }
+    }
+    return flattenedMap;
+  }
+
+  private static MapPartitionsFunction<Tuple2<FileMetadata, FileMetadata>, 
String> findOrphanFiles(
+      PrefixMismatchMode mode, SetAccumulator<Pair<String, String>> conflicts) 
{
+    return rows -> {
+      Iterator<String> transformed =
+          Iterators.transform(
+              rows,
+              row -> {
+                FileMetadata actual = row._1;
+                FileMetadata valid = row._2;
+
+                if (valid == null) {
+                  return actual.location;
+                }
+
+                boolean schemeMatch =
+                    Strings.isNullOrEmpty(valid.scheme)
+                        || valid.scheme.equalsIgnoreCase(actual.scheme);
+                boolean authorityMatch =
+                    Strings.isNullOrEmpty(valid.authority)
+                        || valid.authority.equalsIgnoreCase(actual.authority);
+
+                if ((!schemeMatch || !authorityMatch) && mode == 
PrefixMismatchMode.DELETE) {
+                  return actual.location;
+                } else {
+                  if (!schemeMatch) {
+                    conflicts.add(Pair.of(valid.scheme, actual.scheme));
+                  }
+                  if (!authorityMatch) {
+                    conflicts.add(Pair.of(valid.authority, actual.authority));
+                  }
+                }
+
+                return null;
+              });
+      return Iterators.filter(transformed, Objects::nonNull);
+    };
+  }
+
+  private static MapPartitionsFunction<Row, FileMetadata> toFileMetadata(
+      Map<String, String> equalSchemesMap, Map<String, String> 
equalAuthoritiesMap) {
+    return rows ->
+        Iterators.transform(
+            rows,
+            row -> {
+              String location = row.getString(0);
+              URI uri = new Path(location).toUri();
+              String scheme = equalSchemesMap.getOrDefault(uri.getScheme(), 
uri.getScheme());
+              String authority =
+                  equalAuthoritiesMap.getOrDefault(uri.getAuthority(), 
uri.getAuthority());
+              return new FileMetadata(scheme, authority, uri.getPath(), 
location);
+            });
+  }
+
   /**
    * A {@link PathFilter} that filters out hidden path, but does not filter 
out paths that would be
    * marked as hidden by {@link HiddenPathFilter} due to a partition field 
that starts with one of
@@ -395,4 +522,52 @@ public class DeleteOrphanFilesSparkAction extends 
BaseSparkAction<DeleteOrphanFi
           : new PartitionAwareHiddenPathFilter(partitionNames);
     }
   }
+
+  public static class FileMetadata implements Serializable {
+    private String scheme;
+    private String authority;
+    private String path;
+    private String location;
+
+    public FileMetadata(String scheme, String authority, String path, String 
location) {
+      this.scheme = scheme;
+      this.authority = authority;
+      this.path = path;
+      this.location = location;
+    }
+
+    public FileMetadata() {}
+
+    public void setScheme(String scheme) {
+      this.scheme = scheme;
+    }
+
+    public void setAuthority(String authority) {
+      this.authority = authority;
+    }
+
+    public void setPath(String path) {
+      this.path = path;
+    }
+
+    public void setLocation(String location) {
+      this.location = location;
+    }
+
+    public String getScheme() {
+      return scheme;
+    }
+
+    public String getAuthority() {
+      return authority;
+    }
+
+    public String getPath() {
+      return path;
+    }
+
+    public String getLocation() {
+      return location;
+    }
+  }
 }
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SetAccumulator.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SetAccumulator.java
new file mode 100644
index 000000000..745169fc1
--- /dev/null
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SetAccumulator.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import java.util.Collections;
+import java.util.Set;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.spark.util.AccumulatorV2;
+
+public class SetAccumulator<T> extends AccumulatorV2<T, java.util.Set<T>> {
+
+  private final Set<T> set = Collections.synchronizedSet(Sets.newHashSet());
+
+  @Override
+  public boolean isZero() {
+    return set.isEmpty();
+  }
+
+  @Override
+  public AccumulatorV2<T, Set<T>> copy() {
+    SetAccumulator<T> newAccumulator = new SetAccumulator<>();
+    newAccumulator.set.addAll(set);
+    return newAccumulator;
+  }
+
+  @Override
+  public void reset() {
+    set.clear();
+  }
+
+  @Override
+  public void add(T v) {
+    set.add(v);
+  }
+
+  @Override
+  public void merge(AccumulatorV2<T, Set<T>> other) {
+    set.addAll(other.value());
+  }
+
+  @Override
+  public Set<T> value() {
+    return set;
+  }
+}
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
index 00b68c289..24377c32d 100644
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
@@ -18,11 +18,14 @@
  */
 package org.apache.iceberg.spark.procedures;
 
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.actions.DeleteOrphanFiles;
+import org.apache.iceberg.actions.DeleteOrphanFiles.PrefixMismatchMode;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction;
 import org.apache.iceberg.spark.actions.SparkActions;
 import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
@@ -36,6 +39,7 @@ import org.apache.spark.sql.types.Metadata;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.unsafe.types.UTF8String;
+import scala.runtime.BoxedUnit;
 
 /**
  * A procedure that removes orphan files in a table.
@@ -51,7 +55,10 @@ public class RemoveOrphanFilesProcedure extends 
BaseProcedure {
         ProcedureParameter.optional("location", DataTypes.StringType),
         ProcedureParameter.optional("dry_run", DataTypes.BooleanType),
         ProcedureParameter.optional("max_concurrent_deletes", 
DataTypes.IntegerType),
-        ProcedureParameter.optional("file_list_view", DataTypes.StringType)
+        ProcedureParameter.optional("file_list_view", DataTypes.StringType),
+        ProcedureParameter.optional("equal_schemes", STRING_MAP),
+        ProcedureParameter.optional("equal_authorities", STRING_MAP),
+        ProcedureParameter.optional("prefix_mismatch_mode", 
DataTypes.StringType),
       };
 
   private static final StructType OUTPUT_TYPE =
@@ -97,6 +104,33 @@ public class RemoveOrphanFilesProcedure extends 
BaseProcedure {
         maxConcurrentDeletes == null || maxConcurrentDeletes > 0,
         "max_concurrent_deletes should have value > 0,  value: " + 
maxConcurrentDeletes);
 
+    Map<String, String> equalSchemes = Maps.newHashMap();
+    if (!args.isNullAt(6)) {
+      args.getMap(6)
+          .foreach(
+              DataTypes.StringType,
+              DataTypes.StringType,
+              (k, v) -> {
+                equalSchemes.put(k.toString(), v.toString());
+                return BoxedUnit.UNIT;
+              });
+    }
+
+    Map<String, String> equalAuthorities = Maps.newHashMap();
+    if (!args.isNullAt(7)) {
+      args.getMap(7)
+          .foreach(
+              DataTypes.StringType,
+              DataTypes.StringType,
+              (k, v) -> {
+                equalSchemes.put(k.toString(), v.toString());
+                return BoxedUnit.UNIT;
+              });
+    }
+
+    PrefixMismatchMode prefixMismatchMode =
+        args.isNullAt(8) ? null : 
PrefixMismatchMode.fromString(args.getString(8));
+
     return withIcebergTable(
         tableIdent,
         table -> {
@@ -126,6 +160,13 @@ public class RemoveOrphanFilesProcedure extends 
BaseProcedure {
             action.compareToFileList(spark().table(fileListView));
           }
 
+          action.equalSchemes(equalSchemes);
+          action.equalAuthorities(equalAuthorities);
+
+          if (prefixMismatchMode != null) {
+            action.prefixMismatchMode(prefixMismatchMode);
+          }
+
           DeleteOrphanFiles.Result result = action.execute();
 
           return toOutputRows(result);
diff --git 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
index cda48980b..20f995731 100644
--- 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
+++ 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
@@ -50,6 +50,7 @@ import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.hadoop.HadoopCatalog;
 import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.hadoop.HiddenPathFilter;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
@@ -873,4 +874,124 @@ public abstract class TestRemoveOrphanFilesAction extends 
SparkTestBase {
     }
     return current;
   }
+
+  @Test
+  public void testPathsWithExtraSlashes() {
+    List<String> validFiles = Lists.newArrayList("file:///dir1/dir2/file1");
+    List<String> actualFiles = 
Lists.newArrayList("file:///dir1/////dir2///file1");
+    executeTest(validFiles, actualFiles, Lists.newArrayList());
+  }
+
+  @Test
+  public void testPathsWithValidFileHavingNoAuthority() {
+    List<String> validFiles = Lists.newArrayList("hdfs:///dir1/dir2/file1");
+    List<String> actualFiles = 
Lists.newArrayList("hdfs://servicename/dir1/dir2/file1");
+    executeTest(validFiles, actualFiles, Lists.newArrayList());
+  }
+
+  @Test
+  public void testPathsWithActualFileHavingNoAuthority() {
+    List<String> validFiles = 
Lists.newArrayList("hdfs://servicename/dir1/dir2/file1");
+    List<String> actualFiles = Lists.newArrayList("hdfs:///dir1/dir2/file1");
+    executeTest(validFiles, actualFiles, Lists.newArrayList());
+  }
+
+  @Test
+  public void testPathsWithEqualSchemes() {
+    List<String> validFiles = 
Lists.newArrayList("scheme1://bucket1/dir1/dir2/file1");
+    List<String> actualFiles = 
Lists.newArrayList("scheme2://bucket1/dir1/dir2/file1");
+    AssertHelpers.assertThrows(
+        "Test remove orphan files with equal schemes",
+        ValidationException.class,
+        "Conflicting authorities/schemes: [(scheme1, scheme2)]",
+        () ->
+            executeTest(
+                validFiles,
+                actualFiles,
+                Lists.newArrayList(),
+                ImmutableMap.of(),
+                ImmutableMap.of(),
+                DeleteOrphanFiles.PrefixMismatchMode.ERROR));
+
+    Map<String, String> equalSchemes = Maps.newHashMap();
+    equalSchemes.put("scheme1", "scheme");
+    equalSchemes.put("scheme2", "scheme");
+    executeTest(
+        validFiles,
+        actualFiles,
+        Lists.newArrayList(),
+        equalSchemes,
+        ImmutableMap.of(),
+        DeleteOrphanFiles.PrefixMismatchMode.ERROR);
+  }
+
+  @Test
+  public void testPathsWithEqualAuthorities() {
+    List<String> validFiles = 
Lists.newArrayList("hdfs://servicename1/dir1/dir2/file1");
+    List<String> actualFiles = 
Lists.newArrayList("hdfs://servicename2/dir1/dir2/file1");
+    AssertHelpers.assertThrows(
+        "Test remove orphan files with equal authorities",
+        ValidationException.class,
+        "Conflicting authorities/schemes: [(servicename1, servicename2)]",
+        () ->
+            executeTest(
+                validFiles,
+                actualFiles,
+                Lists.newArrayList(),
+                ImmutableMap.of(),
+                ImmutableMap.of(),
+                DeleteOrphanFiles.PrefixMismatchMode.ERROR));
+
+    Map<String, String> equalAuthorities = Maps.newHashMap();
+    equalAuthorities.put("servicename1", "servicename");
+    equalAuthorities.put("servicename2", "servicename");
+    executeTest(
+        validFiles,
+        actualFiles,
+        Lists.newArrayList(),
+        ImmutableMap.of(),
+        equalAuthorities,
+        DeleteOrphanFiles.PrefixMismatchMode.ERROR);
+  }
+
+  @Test
+  public void testRemoveOrphanFileActionWithDeleteMode() {
+    List<String> validFiles = 
Lists.newArrayList("hdfs://servicename1/dir1/dir2/file1");
+    List<String> actualFiles = 
Lists.newArrayList("hdfs://servicename2/dir1/dir2/file1");
+
+    executeTest(
+        validFiles,
+        actualFiles,
+        Lists.newArrayList("hdfs://servicename2/dir1/dir2/file1"),
+        ImmutableMap.of(),
+        ImmutableMap.of(),
+        DeleteOrphanFiles.PrefixMismatchMode.DELETE);
+  }
+
+  private void executeTest(
+      List<String> validFiles, List<String> actualFiles, List<String> 
expectedOrphanFiles) {
+    executeTest(
+        validFiles,
+        actualFiles,
+        expectedOrphanFiles,
+        ImmutableMap.of(),
+        ImmutableMap.of(),
+        DeleteOrphanFiles.PrefixMismatchMode.IGNORE);
+  }
+
+  private void executeTest(
+      List<String> validFiles,
+      List<String> actualFiles,
+      List<String> expectedOrphanFiles,
+      Map<String, String> equalSchemes,
+      Map<String, String> equalAuthorities,
+      DeleteOrphanFiles.PrefixMismatchMode mode) {
+    Dataset<Row> validFilesDF = spark.createDataset(validFiles, 
Encoders.STRING()).toDF();
+    Dataset<Row> actualFilesDF = spark.createDataset(actualFiles, 
Encoders.STRING()).toDF();
+
+    List<String> orphanFiles =
+        DeleteOrphanFilesSparkAction.findOrphanFiles(
+            spark, actualFilesDF, validFilesDF, equalSchemes, 
equalAuthorities, mode);
+    Assert.assertEquals(expectedOrphanFiles, orphanFiles);
+  }
 }

Reply via email to