aokolnychyi commented on code in PR #4652:
URL: https://github.com/apache/iceberg/pull/4652#discussion_r923940050


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java:
##########
@@ -143,6 +144,24 @@ public DeleteOrphanFilesSparkAction 
executeDeleteWith(ExecutorService executorSe
     return this;
   }
 
+  @Override
+  public DeleteOrphanFiles prefixMismatchMode(PrefixMismatchMode mismatchMode) 
{

Review Comment:
   nit: This class uses `newXXX` in other setters, `mismatchMode` -> 
`newPrefixMismatchMode` 
   



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java:
##########
@@ -94,15 +100,10 @@
     extends BaseSparkAction<DeleteOrphanFilesSparkAction> implements 
DeleteOrphanFiles {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(DeleteOrphanFilesSparkAction.class);
-  private static final UserDefinedFunction filenameUDF = functions.udf((String 
path) -> {
-    int lastIndex = path.lastIndexOf(File.separator);
-    if (lastIndex == -1) {
-      return path;
-    } else {
-      return path.substring(lastIndex + 1);
-    }
-  }, DataTypes.StringType);
 
+  private PrefixMismatchMode prefixMismatchMode = PrefixMismatchMode.ERROR;
+  private Map<String, String> equalSchemes;

Review Comment:
   We should add defaults here. You could add a constant and reuse it as a 
default value and in the setter.
   
   ```
   private static final Map<String, String> EQUAL_SCHEMES_DEFAULT = 
ImmutableMap.of("s3n,s3a", "s3");
   ```
   
   ```
   private Map<String, String> equalSchemes = flattenMap(EQUAL_SCHEMES_DEFAULT);
   ```



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java:
##########
@@ -326,6 +344,96 @@ private static FlatMapFunction<Iterator<String>, String> 
listDirsRecursively(
     };
   }
 
+  @VisibleForTesting
+  static List<String> findOrphanFiles(SparkSession spark,
+                                      Dataset<Row> actualFileDF,
+                                      Dataset<Row> validFileDF,
+                                      Map<String, String> equalSchemes,
+                                      Map<String, String> equalAuthorities,
+                                      PrefixMismatchMode prefixMismatchMode) {
+    Map<String, String> equalSchemesMap = flattenMap(equalSchemes);
+    Map<String, String> equalAuthoritiesMap = flattenMap(equalAuthorities);
+
+    Dataset<PathProxy> normalizedActualFileDF = actualFileDF.mapPartitions(
+        toFileMetadata(equalSchemesMap, equalAuthoritiesMap),
+        Encoders.bean(PathProxy.class)).as("actual");
+    Dataset<PathProxy> normalizedValidFileDF = validFileDF.mapPartitions(
+        toFileMetadata(equalSchemesMap, equalAuthoritiesMap),
+        Encoders.bean(PathProxy.class)).as("valid");
+
+    Column actualFileName = normalizedActualFileDF.col("path");

Review Comment:
   I don't think the var names are correct anymore. What about having 
`joindCond` to shorten the line below?
   
   ```
   Column joinCod = 
actualFileMetadataDS.col("path").equalTo(validFileMetadataDS.col("path"));
   
   List<String> orphanFiles = actualFileMetadataDS
       .joinWith(validFileMetadataDS, joinCod, "leftouter")
       .mapPartitions(findOrphanFilesMapPartitions(prefixMismatchMode, 
conflicts), Encoders.STRING())
       .collectAsList();
   ```



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java:
##########
@@ -143,6 +144,24 @@ public DeleteOrphanFilesSparkAction 
executeDeleteWith(ExecutorService executorSe
     return this;
   }
 
+  @Override
+  public DeleteOrphanFiles prefixMismatchMode(PrefixMismatchMode mismatchMode) 
{
+    this.prefixMismatchMode = mismatchMode;
+    return this;
+  }
+
+  @Override
+  public DeleteOrphanFiles equalSchemes(Map<String, String> schemes) {

Review Comment:
   nit: `schemes` -> `newEqualSchemes`



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java:
##########
@@ -143,6 +144,24 @@ public DeleteOrphanFilesSparkAction 
executeDeleteWith(ExecutorService executorSe
     return this;
   }
 
+  @Override
+  public DeleteOrphanFiles prefixMismatchMode(PrefixMismatchMode mismatchMode) 
{
+    this.prefixMismatchMode = mismatchMode;
+    return this;
+  }
+
+  @Override
+  public DeleteOrphanFiles equalSchemes(Map<String, String> schemes) {
+    this.equalSchemes = schemes;

Review Comment:
   Since we also need to create defensive copies of the provided maps, we can 
call `flattenMap` in setters and remove the call in `execute`.



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java:
##########
@@ -143,6 +144,24 @@ public DeleteOrphanFilesSparkAction 
executeDeleteWith(ExecutorService executorSe
     return this;
   }
 
+  @Override
+  public DeleteOrphanFiles prefixMismatchMode(PrefixMismatchMode mismatchMode) 
{
+    this.prefixMismatchMode = mismatchMode;
+    return this;
+  }
+
+  @Override
+  public DeleteOrphanFiles equalSchemes(Map<String, String> schemes) {
+    this.equalSchemes = schemes;
+    return this;
+  }
+
+  @Override
+  public DeleteOrphanFiles equalAuthorities(Map<String, String> authorities) {

Review Comment:
   nit: `authorities` -> `newEqualAuthorities`



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java:
##########
@@ -143,6 +144,24 @@ public DeleteOrphanFilesSparkAction 
executeDeleteWith(ExecutorService executorSe
     return this;
   }
 
+  @Override
+  public DeleteOrphanFiles prefixMismatchMode(PrefixMismatchMode mismatchMode) 
{
+    this.prefixMismatchMode = mismatchMode;
+    return this;
+  }
+
+  @Override
+  public DeleteOrphanFiles equalSchemes(Map<String, String> schemes) {
+    this.equalSchemes = schemes;
+    return this;
+  }
+
+  @Override
+  public DeleteOrphanFiles equalAuthorities(Map<String, String> authorities) {
+    this.equalAuthorities = authorities;

Review Comment:
   ```
   this.equalAuthorities = flattenMap(newEqualAuthorities);
   ```



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java:
##########
@@ -94,15 +100,10 @@
     extends BaseSparkAction<DeleteOrphanFilesSparkAction> implements 
DeleteOrphanFiles {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(DeleteOrphanFilesSparkAction.class);
-  private static final UserDefinedFunction filenameUDF = functions.udf((String 
path) -> {
-    int lastIndex = path.lastIndexOf(File.separator);
-    if (lastIndex == -1) {
-      return path;
-    } else {
-      return path.substring(lastIndex + 1);
-    }
-  }, DataTypes.StringType);
 
+  private PrefixMismatchMode prefixMismatchMode = PrefixMismatchMode.ERROR;
+  private Map<String, String> equalSchemes;
+  private Map<String, String> equalAuthorities;

Review Comment:
   Let's default it to `ImmutableMap.of()`.



##########
api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java:
##########
@@ -89,4 +125,14 @@ interface Result {
      */
     Iterable<String> orphanFileLocations();
   }
+
+  /**

Review Comment:
   What about this?
   
   ```
   /**
    * Defines the action behavior when location prefixes (scheme/authority) 
mismatch.
    * <p>
    * {@link #ERROR} - throw an exception.
    * {@link #IGNORE} - no action.
    * {@link #DELETE} - delete files.
    */
   ```



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java:
##########
@@ -213,14 +232,13 @@ private DeleteOrphanFiles.Result doExecute() {
     Dataset<Row> validFileDF = validContentFileDF.union(validMetadataFileDF);
     Dataset<Row> actualFileDF = compareToFileList == null ? 
buildActualFileDF() : filteredCompareToFileList();
 
-    Column actualFileName = filenameUDF.apply(actualFileDF.col(FILE_PATH));
-    Column validFileName = filenameUDF.apply(validFileDF.col(FILE_PATH));
-    Column nameEqual = actualFileName.equalTo(validFileName);
-    Column actualContains = 
actualFileDF.col(FILE_PATH).contains(validFileDF.col(FILE_PATH));
-    Column joinCond = nameEqual.and(actualContains);
-    List<String> orphanFiles = actualFileDF.join(validFileDF, joinCond, 
"leftanti")
-        .as(Encoders.STRING())
-        .collectAsList();
+    List<String> orphanFiles =
+        findOrphanFiles(spark(),

Review Comment:
   I think I'd prefer one of the following formatting variants mentioned 
[here](https://github.com/apache/iceberg/pull/4652#discussion_r921342889).



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java:
##########
@@ -94,15 +100,10 @@
     extends BaseSparkAction<DeleteOrphanFilesSparkAction> implements 
DeleteOrphanFiles {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(DeleteOrphanFilesSparkAction.class);
-  private static final UserDefinedFunction filenameUDF = functions.udf((String 
path) -> {
-    int lastIndex = path.lastIndexOf(File.separator);
-    if (lastIndex == -1) {
-      return path;
-    } else {
-      return path.substring(lastIndex + 1);
-    }
-  }, DataTypes.StringType);
 
+  private PrefixMismatchMode prefixMismatchMode = PrefixMismatchMode.ERROR;

Review Comment:
   There is a block of mutable variables below. I think we should move these 
new variables there, keeping an empty line between the blocks with mutable and 
immutable vars.



##########
api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java:
##########
@@ -80,6 +81,41 @@ public interface DeleteOrphanFiles extends 
Action<DeleteOrphanFiles, DeleteOrpha
    */
   DeleteOrphanFiles executeDeleteWith(ExecutorService executorService);
 
+  /**

Review Comment:
   Javadoc don't respect new lines. That's why explicit `<p>` are required. 
Also, it is a very fragile place, so I'd add as much documentation as possible. 
What about this?
   
   ```
     /**
      * Passes a prefix mismatch mode that determines how this action should 
handle situations when
      * the metadata references files that match listed/provided files except 
for authority/scheme.
      * <p>
      * Possible values are "ERROR", "IGNORE", "DELETE". The default mismatch 
mode is "ERROR",
      * which means an exception is thrown whenever there is a mismatch in 
authority/scheme.
      * It's the recommended mismatch mode and should be changed only in some 
rare circumstances.
      * If there is a mismatch, use {@link #equalSchemes(Map)} and {@link 
#equalAuthorities(Map)}
      * to resolve conflicts by providing equivalent schemes and authorities. 
If it is impossible
      * to determine whether the conflicting authorities/schemes are equal, set 
the prefix mismatch
      * mode to "IGNORE" to skip files with mismatches. If you have manually 
inspected all conflicting
      * authorities/schemes, provided equivalent schemes/authorities and are 
absolutely confident
      * the remaining ones are different, set the prefix mismatch mode to 
"DELETE" to consider files
      * with mismatches as orphan. It will be impossible to recover files after 
deletion,
      * so the "DELETE" prefix mismatch mode must be used with extreme caution.
      *
      * @param mode mode for handling prefix mismatches
      * @return this for method chaining
      */
   ```



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java:
##########
@@ -143,6 +144,24 @@ public DeleteOrphanFilesSparkAction 
executeDeleteWith(ExecutorService executorSe
     return this;
   }
 
+  @Override
+  public DeleteOrphanFiles prefixMismatchMode(PrefixMismatchMode mismatchMode) 
{
+    this.prefixMismatchMode = mismatchMode;
+    return this;
+  }
+
+  @Override
+  public DeleteOrphanFiles equalSchemes(Map<String, String> schemes) {
+    this.equalSchemes = schemes;

Review Comment:
   Whenever someone provides a map of equal schemes, we have to apply it on top 
of default values.
   
   ```
   this.equalSchemes = Maps.newHashMap();
   equalSchemes.putAll(flattenMap(EQUAL_SCHEMES_DEFAULT));
   equalSchemes.putAll(flattenMap(newEqualSchemes));
   return this;
   ```



##########
api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java:
##########
@@ -80,6 +81,41 @@ public interface DeleteOrphanFiles extends 
Action<DeleteOrphanFiles, DeleteOrpha
    */
   DeleteOrphanFiles executeDeleteWith(ExecutorService executorService);
 
+  /**
+   * Pass a mode for handling the files that cannot be determined if they are 
orphan
+   * The allowed modes are "IGNORE", "ERROR", "DELETE"
+   * Default prefixMismatchMode is "ERROR".
+   *
+   * @param mode mode for handling files that cannot be determined if they are 
orphan
+   * @return this for method chaining
+   */
+  default DeleteOrphanFiles prefixMismatchMode(PrefixMismatchMode mode) {
+    throw new UnsupportedOperationException(this.getClass().getName() + " does 
not implement prefixMismatchMode");
+  }
+
+  /**

Review Comment:
   The doc here should be updated to reflect that we support commas in keys now.
   
   ```
   /**
    * Passes schemes that should be considered equal.
    * <p>
    * The key may include a comma-separated list of schemes. For instance, 
Map("s3a,s3,s3n", "s3").
    *
    * @param equalSchemes list of equal schemes
    * @return this for method chaining
    */
   ```



##########
api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java:
##########
@@ -80,6 +81,41 @@ public interface DeleteOrphanFiles extends 
Action<DeleteOrphanFiles, DeleteOrpha
    */
   DeleteOrphanFiles executeDeleteWith(ExecutorService executorService);
 
+  /**
+   * Pass a mode for handling the files that cannot be determined if they are 
orphan
+   * The allowed modes are "IGNORE", "ERROR", "DELETE"
+   * Default prefixMismatchMode is "ERROR".
+   *
+   * @param mode mode for handling files that cannot be determined if they are 
orphan
+   * @return this for method chaining
+   */
+  default DeleteOrphanFiles prefixMismatchMode(PrefixMismatchMode mode) {
+    throw new UnsupportedOperationException(this.getClass().getName() + " does 
not implement prefixMismatchMode");
+  }
+
+  /**
+   * Pass a Map with a String key and comma separated list of schemes  to be 
considered equal when finding orphan files.
+   * example: Map("s3", "s3a, s3, s3n")
+   *
+   * @param equalSchemes list of equal schemes
+   * @return this for method chaining
+   */
+  default DeleteOrphanFiles equalSchemes(Map<String, String> equalSchemes) {
+    throw new UnsupportedOperationException(this.getClass().getName() + " does 
not implement equalSchemes");
+  }
+
+  /**

Review Comment:
   Same here.
   
   ```
   /**
    * Passes authorities that should be considered equal.
    * <p>
    * The key may include a comma-separate list of authorities. For instance, 
Map("s1name,s2name", "servicename").
    *
    * @param equalAuthorities list of equal authorities
    * @return this for method chaining
    */
   ```



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java:
##########
@@ -326,6 +344,96 @@ private static FlatMapFunction<Iterator<String>, String> 
listDirsRecursively(
     };
   }
 
+  @VisibleForTesting
+  static List<String> findOrphanFiles(SparkSession spark,
+                                      Dataset<Row> actualFileDF,
+                                      Dataset<Row> validFileDF,
+                                      Map<String, String> equalSchemes,
+                                      Map<String, String> equalAuthorities,
+                                      PrefixMismatchMode prefixMismatchMode) {
+    Map<String, String> equalSchemesMap = flattenMap(equalSchemes);

Review Comment:
   These two calls will be redundant if you call `flattenMap` in setters.



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java:
##########
@@ -326,6 +344,96 @@ private static FlatMapFunction<Iterator<String>, String> 
listDirsRecursively(
     };
   }
 
+  @VisibleForTesting
+  static List<String> findOrphanFiles(SparkSession spark,
+                                      Dataset<Row> actualFileDF,
+                                      Dataset<Row> validFileDF,
+                                      Map<String, String> equalSchemes,
+                                      Map<String, String> equalAuthorities,
+                                      PrefixMismatchMode prefixMismatchMode) {
+    Map<String, String> equalSchemesMap = flattenMap(equalSchemes);
+    Map<String, String> equalAuthoritiesMap = flattenMap(equalAuthorities);
+
+    Dataset<PathProxy> normalizedActualFileDF = actualFileDF.mapPartitions(
+        toFileMetadata(equalSchemesMap, equalAuthoritiesMap),
+        Encoders.bean(PathProxy.class)).as("actual");
+    Dataset<PathProxy> normalizedValidFileDF = validFileDF.mapPartitions(
+        toFileMetadata(equalSchemesMap, equalAuthoritiesMap),
+        Encoders.bean(PathProxy.class)).as("valid");
+
+    Column actualFileName = normalizedActualFileDF.col("path");
+    Column validFileName = normalizedValidFileDF.col("path");
+
+    SetAccumulator<Pair<String, String>> setAccumulator = new 
SetAccumulator<>();

Review Comment:
   nit: `setAccumulator` -> `conflicts`?



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java:
##########
@@ -326,6 +344,96 @@ private static FlatMapFunction<Iterator<String>, String> 
listDirsRecursively(
     };
   }
 
+  @VisibleForTesting
+  static List<String> findOrphanFiles(SparkSession spark,

Review Comment:
   This method currently uses the preferred formatting in Iceberg but I see 
that other methods in this class use other formatting cause the names are long. 
I don't have a preference. Up to you if you want to make it consistent with 
other methods.
   
   <img width="619" alt="image" 
src="https://user-images.githubusercontent.com/6235869/179638774-f49a50a8-f22e-47cb-a318-2008ca5c0b95.png";>
   



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java:
##########
@@ -326,6 +344,96 @@ private static FlatMapFunction<Iterator<String>, String> 
listDirsRecursively(
     };
   }
 
+  @VisibleForTesting
+  static List<String> findOrphanFiles(SparkSession spark,
+                                      Dataset<Row> actualFileDF,
+                                      Dataset<Row> validFileDF,
+                                      Map<String, String> equalSchemes,
+                                      Map<String, String> equalAuthorities,
+                                      PrefixMismatchMode prefixMismatchMode) {
+    Map<String, String> equalSchemesMap = flattenMap(equalSchemes);
+    Map<String, String> equalAuthoritiesMap = flattenMap(equalAuthorities);
+
+    Dataset<PathProxy> normalizedActualFileDF = actualFileDF.mapPartitions(
+        toFileMetadata(equalSchemesMap, equalAuthoritiesMap),
+        Encoders.bean(PathProxy.class)).as("actual");
+    Dataset<PathProxy> normalizedValidFileDF = validFileDF.mapPartitions(
+        toFileMetadata(equalSchemesMap, equalAuthoritiesMap),
+        Encoders.bean(PathProxy.class)).as("valid");
+
+    Column actualFileName = normalizedActualFileDF.col("path");
+    Column validFileName = normalizedValidFileDF.col("path");
+
+    SetAccumulator<Pair<String, String>> setAccumulator = new 
SetAccumulator<>();
+    spark.sparkContext().register(setAccumulator);
+
+    List<String> orphanFiles = 
normalizedActualFileDF.joinWith(normalizedValidFileDF,
+            actualFileName.equalTo(validFileName), "leftouter")
+        .mapPartitions(findOrphanFilesMapPartitions(prefixMismatchMode, 
setAccumulator), Encoders.STRING())
+        .collectAsList();
+
+    if (prefixMismatchMode == PrefixMismatchMode.ERROR && 
!setAccumulator.value().isEmpty()) {
+      throw new ValidationException("Unable to deterministically find all 
orphan files." +
+          " Found file paths that have same file path but different 
authorities/schemes. Conflicting" +
+          " authorities/schemes found: %s", setAccumulator.value().toString());
+    }
+    return orphanFiles;
+  }
+
+  private static Map<String, String> flattenMap(Map<String, String> 
toBeFlattenedMap) {
+    Map<String, String> flattenedMap = Maps.newHashMap();
+    if (toBeFlattenedMap != null) {
+      for (String key : toBeFlattenedMap.keySet()) {
+        String value = toBeFlattenedMap.get(key);
+        Arrays.stream(key.split(",")).map(String::trim)

Review Comment:
   We are using Splitter from Guava in a lot of places. What about using it 
here too?
   
   ```
   private static final Splitter COMMA = Splitter.on(",");
   ```
   
   ```
   if (map != null) {
     for (String key : map.keySet()) {
       String value = map.get(key);
       for (String splitKey : COMMA.split(key)) {
         flattenedMap.put(splitKey.trim(), value);
       }
     }
   }
   ```



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java:
##########
@@ -326,6 +344,96 @@ private static FlatMapFunction<Iterator<String>, String> 
listDirsRecursively(
     };
   }
 
+  @VisibleForTesting
+  static List<String> findOrphanFiles(SparkSession spark,
+                                      Dataset<Row> actualFileDF,
+                                      Dataset<Row> validFileDF,
+                                      Map<String, String> equalSchemes,
+                                      Map<String, String> equalAuthorities,
+                                      PrefixMismatchMode prefixMismatchMode) {
+    Map<String, String> equalSchemesMap = flattenMap(equalSchemes);
+    Map<String, String> equalAuthoritiesMap = flattenMap(equalAuthorities);
+
+    Dataset<PathProxy> normalizedActualFileDF = actualFileDF.mapPartitions(
+        toFileMetadata(equalSchemesMap, equalAuthoritiesMap),
+        Encoders.bean(PathProxy.class)).as("actual");
+    Dataset<PathProxy> normalizedValidFileDF = validFileDF.mapPartitions(
+        toFileMetadata(equalSchemesMap, equalAuthoritiesMap),
+        Encoders.bean(PathProxy.class)).as("valid");
+
+    Column actualFileName = normalizedActualFileDF.col("path");
+    Column validFileName = normalizedValidFileDF.col("path");
+
+    SetAccumulator<Pair<String, String>> setAccumulator = new 
SetAccumulator<>();
+    spark.sparkContext().register(setAccumulator);
+
+    List<String> orphanFiles = 
normalizedActualFileDF.joinWith(normalizedValidFileDF,
+            actualFileName.equalTo(validFileName), "leftouter")
+        .mapPartitions(findOrphanFilesMapPartitions(prefixMismatchMode, 
setAccumulator), Encoders.STRING())
+        .collectAsList();
+
+    if (prefixMismatchMode == PrefixMismatchMode.ERROR && 
!setAccumulator.value().isEmpty()) {
+      throw new ValidationException("Unable to deterministically find all 
orphan files." +

Review Comment:
   What about giving more details and instructions to the user? Something like 
this?
   
   ```
   throw new ValidationException("Unable to determine whether certain files are 
orphan. " +
       "Metadata references files that match listed/provided files except for 
authority/scheme. " +
       "Please, inspect the conflicting authorities/schemes and provide which 
of them are equal " +
       "by further configuring the action via equalSchemes() and 
equalAuthorities() methods. " +
       "Set the prefix mismatch mode to 'NONE' to ignore remaining locations 
with conflicting " +
       "authorities/schemes or to 'DELETE' iff you are ABSOLUTELY confident 
that remaining conflicting " +
       "authorities/schemes are different. It will be impossible to recover 
deleted files. " +
       "Conflicting authorities/schemes: %s.", conflicts.value());
   ```



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java:
##########
@@ -326,6 +344,96 @@ private static FlatMapFunction<Iterator<String>, String> 
listDirsRecursively(
     };
   }
 
+  @VisibleForTesting
+  static List<String> findOrphanFiles(SparkSession spark,
+                                      Dataset<Row> actualFileDF,
+                                      Dataset<Row> validFileDF,
+                                      Map<String, String> equalSchemes,
+                                      Map<String, String> equalAuthorities,
+                                      PrefixMismatchMode prefixMismatchMode) {
+    Map<String, String> equalSchemesMap = flattenMap(equalSchemes);
+    Map<String, String> equalAuthoritiesMap = flattenMap(equalAuthorities);
+
+    Dataset<PathProxy> normalizedActualFileDF = actualFileDF.mapPartitions(

Review Comment:
   I think I liked `FileMetadata` more. I'd also consider changing vars a bit 
and dropping `as` calls.
   
   ```
   Dataset<FileMetadata> actualFileMetadataDS = actualFileDF.mapPartitions(
       toFileMetadata(equalSchemes, equalAuthorities),
       Encoders.bean(FileMetadata.class));
   
   Dataset<FileMetadata> validFileMetadataDS = validFileDF.mapPartitions(
       toFileMetadata(equalSchemes, equalAuthorities),
       Encoders.bean(FileMetadata.class));
   ```



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java:
##########
@@ -326,6 +344,96 @@ private static FlatMapFunction<Iterator<String>, String> 
listDirsRecursively(
     };
   }
 
+  @VisibleForTesting
+  static List<String> findOrphanFiles(SparkSession spark,
+                                      Dataset<Row> actualFileDF,
+                                      Dataset<Row> validFileDF,
+                                      Map<String, String> equalSchemes,
+                                      Map<String, String> equalAuthorities,
+                                      PrefixMismatchMode prefixMismatchMode) {
+    Map<String, String> equalSchemesMap = flattenMap(equalSchemes);
+    Map<String, String> equalAuthoritiesMap = flattenMap(equalAuthorities);
+
+    Dataset<PathProxy> normalizedActualFileDF = actualFileDF.mapPartitions(
+        toFileMetadata(equalSchemesMap, equalAuthoritiesMap),
+        Encoders.bean(PathProxy.class)).as("actual");
+    Dataset<PathProxy> normalizedValidFileDF = validFileDF.mapPartitions(
+        toFileMetadata(equalSchemesMap, equalAuthoritiesMap),
+        Encoders.bean(PathProxy.class)).as("valid");
+
+    Column actualFileName = normalizedActualFileDF.col("path");
+    Column validFileName = normalizedValidFileDF.col("path");
+
+    SetAccumulator<Pair<String, String>> setAccumulator = new 
SetAccumulator<>();
+    spark.sparkContext().register(setAccumulator);
+
+    List<String> orphanFiles = 
normalizedActualFileDF.joinWith(normalizedValidFileDF,
+            actualFileName.equalTo(validFileName), "leftouter")
+        .mapPartitions(findOrphanFilesMapPartitions(prefixMismatchMode, 
setAccumulator), Encoders.STRING())
+        .collectAsList();
+
+    if (prefixMismatchMode == PrefixMismatchMode.ERROR && 
!setAccumulator.value().isEmpty()) {
+      throw new ValidationException("Unable to deterministically find all 
orphan files." +
+          " Found file paths that have same file path but different 
authorities/schemes. Conflicting" +
+          " authorities/schemes found: %s", setAccumulator.value().toString());
+    }
+    return orphanFiles;
+  }
+
+  private static Map<String, String> flattenMap(Map<String, String> 
toBeFlattenedMap) {
+    Map<String, String> flattenedMap = Maps.newHashMap();
+    if (toBeFlattenedMap != null) {
+      for (String key : toBeFlattenedMap.keySet()) {
+        String value = toBeFlattenedMap.get(key);
+        Arrays.stream(key.split(",")).map(String::trim)
+            .forEach(splitKey -> flattenedMap.put(splitKey, value));
+      }
+    }
+    return flattenedMap;
+  }
+
+  private static MapPartitionsFunction<Tuple2<PathProxy, PathProxy>, String> 
findOrphanFilesMapPartitions(
+      PrefixMismatchMode mode,
+      SetAccumulator<Pair<String, String>> conflicts) {
+    return rows -> {
+      Iterator<String> transformed = Iterators.transform(rows, row -> {
+        PathProxy actual = row._1;
+        PathProxy valid = row._2;
+        if (valid == null) {
+          return actual.getFilePath();
+        }
+        boolean schemeMatch = Strings.isNullOrEmpty(valid.getScheme()) ||

Review Comment:
   I think you can access fields directly in this class to make the code 
shorter.



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java:
##########
@@ -326,6 +344,96 @@ private static FlatMapFunction<Iterator<String>, String> 
listDirsRecursively(
     };
   }
 
+  @VisibleForTesting
+  static List<String> findOrphanFiles(SparkSession spark,
+                                      Dataset<Row> actualFileDF,
+                                      Dataset<Row> validFileDF,
+                                      Map<String, String> equalSchemes,
+                                      Map<String, String> equalAuthorities,
+                                      PrefixMismatchMode prefixMismatchMode) {
+    Map<String, String> equalSchemesMap = flattenMap(equalSchemes);
+    Map<String, String> equalAuthoritiesMap = flattenMap(equalAuthorities);
+
+    Dataset<PathProxy> normalizedActualFileDF = actualFileDF.mapPartitions(
+        toFileMetadata(equalSchemesMap, equalAuthoritiesMap),
+        Encoders.bean(PathProxy.class)).as("actual");
+    Dataset<PathProxy> normalizedValidFileDF = validFileDF.mapPartitions(
+        toFileMetadata(equalSchemesMap, equalAuthoritiesMap),
+        Encoders.bean(PathProxy.class)).as("valid");
+
+    Column actualFileName = normalizedActualFileDF.col("path");
+    Column validFileName = normalizedValidFileDF.col("path");
+
+    SetAccumulator<Pair<String, String>> setAccumulator = new 
SetAccumulator<>();
+    spark.sparkContext().register(setAccumulator);
+
+    List<String> orphanFiles = 
normalizedActualFileDF.joinWith(normalizedValidFileDF,
+            actualFileName.equalTo(validFileName), "leftouter")
+        .mapPartitions(findOrphanFilesMapPartitions(prefixMismatchMode, 
setAccumulator), Encoders.STRING())
+        .collectAsList();
+
+    if (prefixMismatchMode == PrefixMismatchMode.ERROR && 
!setAccumulator.value().isEmpty()) {
+      throw new ValidationException("Unable to deterministically find all 
orphan files." +
+          " Found file paths that have same file path but different 
authorities/schemes. Conflicting" +
+          " authorities/schemes found: %s", setAccumulator.value().toString());
+    }
+    return orphanFiles;
+  }
+
+  private static Map<String, String> flattenMap(Map<String, String> 
toBeFlattenedMap) {
+    Map<String, String> flattenedMap = Maps.newHashMap();
+    if (toBeFlattenedMap != null) {
+      for (String key : toBeFlattenedMap.keySet()) {
+        String value = toBeFlattenedMap.get(key);
+        Arrays.stream(key.split(",")).map(String::trim)
+            .forEach(splitKey -> flattenedMap.put(splitKey, value));
+      }
+    }
+    return flattenedMap;
+  }
+
+  private static MapPartitionsFunction<Tuple2<PathProxy, PathProxy>, String> 
findOrphanFilesMapPartitions(

Review Comment:
   Do we need `MapPartitions` at the end?



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java:
##########
@@ -120,6 +147,12 @@ public InternalRow[] call(InternalRow args) {
       if (fileListView != null) {
         action.compareToFileList(spark().table(fileListView));
       }
+      Preconditions.checkArgument(prefixMismatchMode == null ||
+              Lists.newArrayList("ignore", 
"error").contains(prefixMismatchMode.toLowerCase()),

Review Comment:
   We should also support delete. What about adding `fromString` to 
`PrefixMismatchMode` and using it?



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java:
##########
@@ -326,6 +344,96 @@ private static FlatMapFunction<Iterator<String>, String> 
listDirsRecursively(
     };
   }
 
+  @VisibleForTesting
+  static List<String> findOrphanFiles(SparkSession spark,
+                                      Dataset<Row> actualFileDF,
+                                      Dataset<Row> validFileDF,
+                                      Map<String, String> equalSchemes,
+                                      Map<String, String> equalAuthorities,
+                                      PrefixMismatchMode prefixMismatchMode) {
+    Map<String, String> equalSchemesMap = flattenMap(equalSchemes);
+    Map<String, String> equalAuthoritiesMap = flattenMap(equalAuthorities);
+
+    Dataset<PathProxy> normalizedActualFileDF = actualFileDF.mapPartitions(
+        toFileMetadata(equalSchemesMap, equalAuthoritiesMap),
+        Encoders.bean(PathProxy.class)).as("actual");
+    Dataset<PathProxy> normalizedValidFileDF = validFileDF.mapPartitions(
+        toFileMetadata(equalSchemesMap, equalAuthoritiesMap),
+        Encoders.bean(PathProxy.class)).as("valid");
+
+    Column actualFileName = normalizedActualFileDF.col("path");
+    Column validFileName = normalizedValidFileDF.col("path");
+
+    SetAccumulator<Pair<String, String>> setAccumulator = new 
SetAccumulator<>();
+    spark.sparkContext().register(setAccumulator);
+
+    List<String> orphanFiles = 
normalizedActualFileDF.joinWith(normalizedValidFileDF,
+            actualFileName.equalTo(validFileName), "leftouter")
+        .mapPartitions(findOrphanFilesMapPartitions(prefixMismatchMode, 
setAccumulator), Encoders.STRING())
+        .collectAsList();
+
+    if (prefixMismatchMode == PrefixMismatchMode.ERROR && 
!setAccumulator.value().isEmpty()) {
+      throw new ValidationException("Unable to deterministically find all 
orphan files." +
+          " Found file paths that have same file path but different 
authorities/schemes. Conflicting" +
+          " authorities/schemes found: %s", setAccumulator.value().toString());
+    }
+    return orphanFiles;
+  }
+
+  private static Map<String, String> flattenMap(Map<String, String> 
toBeFlattenedMap) {

Review Comment:
   nit: `toBeFlattenedMap` -> `map`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to