aokolnychyi commented on code in PR #4652:
URL: https://github.com/apache/iceberg/pull/4652#discussion_r888437365


##########
api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java:
##########
@@ -89,4 +117,8 @@ interface Result {
      */
     Iterable<String> orphanFileLocations();
   }
+
+  enum PrefixMisMatchMode {

Review Comment:
   nit: I believe `mismatch` is a single word so `PrefixMisMatchMode` -> 
`PrefixMismatchMode`



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java:
##########
@@ -114,10 +110,19 @@ public void accept(String file) {
   };
 
   private String location = null;
+  private PrefixMisMatchMode prefixMismatchMode = PrefixMisMatchMode.ERROR;
+  private List<String> equivalentSchemes;

Review Comment:
   If we decide to use shorter names for methods, I'd also rename all vars.



##########
api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java:
##########
@@ -80,6 +81,33 @@ public interface DeleteOrphanFiles extends 
Action<DeleteOrphanFiles, DeleteOrpha
    */
   DeleteOrphanFiles executeDeleteWith(ExecutorService executorService);
 
+  /**
+   * Pass a mode for handling the files that cannot be determined if they are 
orphan

Review Comment:
   nit: I find it nice to have an empty line after the method desc before 
params. Applies to all 3 new methods.
   
   All existing methods in this class already follow this pattern.



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java:
##########
@@ -114,10 +110,19 @@ public void accept(String file) {
   };
 
   private String location = null;
+  private PrefixMisMatchMode prefixMismatchMode = PrefixMisMatchMode.ERROR;
+  private List<String> equivalentSchemes;
+  private List<String> equivalentAuthorities;
   private long olderThanTimestamp = System.currentTimeMillis() - 
TimeUnit.DAYS.toMillis(3);
   private Dataset<Row> compareToFileList;
   private Consumer<String> deleteFunc = defaultDelete;
   private ExecutorService deleteExecutorService = null;
+  private static final StructType fileSchema = new StructType(new 
StructField[]{
+      new StructField("scheme", DataTypes.StringType, true, new Metadata()),
+      new StructField("authority", DataTypes.StringType, true, new Metadata()),
+      new StructField("path", DataTypes.StringType, true, new Metadata()),
+      new StructField("file_path", DataTypes.StringType, true, new Metadata())

Review Comment:
   nit: `new Metadata()` -> `Metadata.empty()` in all calls



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java:
##########
@@ -361,4 +403,90 @@ static PathFilter forSpecs(Map<Integer, PartitionSpec> 
specs) {
       return partitionNames.isEmpty() ? HiddenPathFilter.get() : new 
PartitionAwareHiddenPathFilter(partitionNames);
     }
   }
+
+  private static Dataset<Row> sanitizeAndSplitPaths(Dataset<Row> filesDF) {
+    return filesDF.mapPartitions((MapPartitionsFunction<Row, Row>) input ->
+        Iterators.transform(input, row -> {
+          String pathString = row.getString(0);
+          Path path = new Path(pathString);
+          URI uri = path.toUri();
+          List<Object> values = Lists.newArrayList(uri.getScheme(), 
uri.getAuthority(), uri.getPath(), pathString);
+          return 
Row$.MODULE$.apply(scala.collection.JavaConverters.asScalaBuffer(values).toSeq());
+        }), RowEncoder.apply(fileSchema));
+  }
+  static class MapOrphanFilesFunction implements MapPartitionsFunction<Row, 
String> {

Review Comment:
   Same here. I'd probably use a method that returns a function for consistency.



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java:
##########
@@ -114,10 +110,19 @@ public void accept(String file) {
   };
 
   private String location = null;
+  private PrefixMisMatchMode prefixMismatchMode = PrefixMisMatchMode.ERROR;
+  private List<String> equivalentSchemes;
+  private List<String> equivalentAuthorities;
   private long olderThanTimestamp = System.currentTimeMillis() - 
TimeUnit.DAYS.toMillis(3);
   private Dataset<Row> compareToFileList;
   private Consumer<String> deleteFunc = defaultDelete;
   private ExecutorService deleteExecutorService = null;
+  private static final StructType fileSchema = new StructType(new 
StructField[]{

Review Comment:
   Static constants should be defined at the top before instance variables. 
Also, constants should use capital letters. What about also giving a little bit 
more descriptive name like `FILE_METADATA_SCHEMA`? Feel free to pick up another 
name too.



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java:
##########
@@ -213,14 +236,12 @@ private DeleteOrphanFiles.Result doExecute() {
     Dataset<Row> validFileDF = validContentFileDF.union(validMetadataFileDF);
     Dataset<Row> actualFileDF = compareToFileList == null ? 
buildActualFileDF() : filteredCompareToFileList();
 
-    Column actualFileName = filenameUDF.apply(actualFileDF.col(FILE_PATH));
-    Column validFileName = filenameUDF.apply(validFileDF.col(FILE_PATH));
-    Column nameEqual = actualFileName.equalTo(validFileName);
-    Column actualContains = 
actualFileDF.col(FILE_PATH).contains(validFileDF.col(FILE_PATH));
-    Column joinCond = nameEqual.and(actualContains);
-    List<String> orphanFiles = actualFileDF.join(validFileDF, joinCond, 
"leftanti")
-        .as(Encoders.STRING())
-        .collectAsList();
+    List<String> orphanFiles =
+        orphanFileDF(actualFileDF,
+            validFileDF,
+            equivalentSchemes,
+            equivalentAuthorities,
+            prefixMismatchMode).collectAsList();

Review Comment:
   What about calling this method `findOrphanFiles` and moving the collect call 
inside?



##########
api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java:
##########
@@ -80,6 +81,33 @@ public interface DeleteOrphanFiles extends 
Action<DeleteOrphanFiles, DeleteOrpha
    */
   DeleteOrphanFiles executeDeleteWith(ExecutorService executorService);
 
+  /**
+   * Pass a mode for handling the files that cannot be determined if they are 
orphan
+   * @param mode mode for handling files that cannot be determined if they are 
orphan
+   * @return this for method chaining
+   */
+  default DeleteOrphanFiles prefixMismatchMode(PrefixMisMatchMode mode) {
+    throw new UnsupportedOperationException(this.getClass().getName() + " does 
not implement prefixMismatchMode");
+  }
+
+  /**
+   * Pass a list of schemes to be considered equivalent when finding orphan 
files
+   * @param equivalentSchemes list of equivalent schemes
+   * @return this for method chaining
+   */
+  default DeleteOrphanFiles equivalentSchemes(List<String> equivalentSchemes) {

Review Comment:
   I wonder whether we should use `equal` instead of `equivalent`. They mean 
the same thing but the first one is shorter.



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java:
##########
@@ -361,4 +403,90 @@ static PathFilter forSpecs(Map<Integer, PartitionSpec> 
specs) {
       return partitionNames.isEmpty() ? HiddenPathFilter.get() : new 
PartitionAwareHiddenPathFilter(partitionNames);
     }
   }
+
+  private static Dataset<Row> sanitizeAndSplitPaths(Dataset<Row> filesDF) {
+    return filesDF.mapPartitions((MapPartitionsFunction<Row, Row>) input ->
+        Iterators.transform(input, row -> {
+          String pathString = row.getString(0);
+          Path path = new Path(pathString);
+          URI uri = path.toUri();
+          List<Object> values = Lists.newArrayList(uri.getScheme(), 
uri.getAuthority(), uri.getPath(), pathString);
+          return 
Row$.MODULE$.apply(scala.collection.JavaConverters.asScalaBuffer(values).toSeq());
+        }), RowEncoder.apply(fileSchema));
+  }
+  static class MapOrphanFilesFunction implements MapPartitionsFunction<Row, 
String> {
+
+    private final List<String> equivalentSchemes;
+    private final List<String> equivalentAuthorities;
+    private final PrefixMisMatchMode mismatchMode;
+    private final StructType scheme;
+
+    MapOrphanFilesFunction(List<String> equivalentSchemes,
+                           List<String> equivalentAuthorities,
+                           PrefixMisMatchMode mismatchMode,
+                           StructType schema) {
+      this.equivalentSchemes = equivalentSchemes;
+      this.equivalentAuthorities = equivalentAuthorities;
+      this.mismatchMode = mismatchMode;
+      this.scheme = schema;
+    }
+
+    @Override
+    public Iterator<String> call(Iterator<Row> value) throws Exception {
+
+      Iterator<String> orphanFilesIterator = Iterators.transform(value, row -> 
{
+        if (isOrphan(row)) {
+          return row.getString(3);
+        } else {
+          return null;
+        }
+      });
+      return Iterators.filter(orphanFilesIterator, StringUtils::isNotBlank);
+    }
+
+    boolean isOrphan(Row row) {
+      String[] fields = scheme.fieldNames();
+
+      // actual file related fields
+      assert (fields[0].equalsIgnoreCase("scheme"));

Review Comment:
   I am not a big fan of using Java asserts as they may cause some interesting 
issues like prohibit inlining and generally complicate the code. What about 
using indices directly since we construct the incoming `DataFrame` ourselves?



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java:
##########
@@ -361,4 +403,90 @@ static PathFilter forSpecs(Map<Integer, PartitionSpec> 
specs) {
       return partitionNames.isEmpty() ? HiddenPathFilter.get() : new 
PartitionAwareHiddenPathFilter(partitionNames);
     }
   }
+
+  private static Dataset<Row> sanitizeAndSplitPaths(Dataset<Row> filesDF) {

Review Comment:
   This class uses a slightly different style for defining Spark transforms 
(see `listDirsRecursively` above).
   
   What about something like this?
   
   ```
     private static MapPartitionsFunction<Row, Row> toFileMetadata(Map<String, 
String> equalSchemes,
                                                                   Map<String, 
String> equalAuthorities) {
       return rows -> Iterators.transform(rows, row -> {
         String filePathAsString = row.getString(0);
         URI uri = new Path(filePathAsString).toUri();
   
         String scheme = uri.getScheme();
         String normalizedScheme = equalSchemes.getOrDefault(scheme, scheme);
   
         String authority = uri.getAuthority();
         String normalizedAuthority = equalAuthorities.getOrDefault(authority, 
authority);
   
         return RowFactory.create(normalizedScheme, normalizedAuthority, 
uri.getPath(), filePathAsString);
       });
     }
   ```
   
   Then we can call it like this:
   
   ```
   Dataset<Row> validFileMetadataDF = validFileDF
       .mapPartitions(toFileMetadata(equalSchemes, equalAuthorities), 
FILE_METADATA_ENCODER)
       .as("valid");
   ```



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java:
##########
@@ -51,7 +54,10 @@ public class RemoveOrphanFilesProcedure extends 
BaseProcedure {
       ProcedureParameter.optional("location", DataTypes.StringType),
       ProcedureParameter.optional("dry_run", DataTypes.BooleanType),
       ProcedureParameter.optional("max_concurrent_deletes", 
DataTypes.IntegerType),
-      ProcedureParameter.optional("file_list_view", DataTypes.StringType)
+      ProcedureParameter.optional("file_list_view", DataTypes.StringType),
+      ProcedureParameter.optional("equivalent_schemes", 
DataTypes.createArrayType(DataTypes.StringType)),

Review Comment:
   I think shorter names would be handy in the procedure.



##########
api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java:
##########
@@ -80,6 +81,28 @@ public interface DeleteOrphanFiles extends 
Action<DeleteOrphanFiles, DeleteOrpha
    */
   DeleteOrphanFiles executeDeleteWith(ExecutorService executorService);
 
+
+  /**
+   * Pass a mode for handling the files that cannot be determined if they are 
orphan
+   * @param mode mode for handling files that cannot be determined if they are 
orphan
+   * @return this for method chaining
+   */
+  DeleteOrphanFiles prefixMismatchMode(String mode);
+
+  /**
+   * Pass a list of schemes to be considered equivalent when finding orphan 
files
+   * @param equivalentSchemes list of equivalent schemes
+   * @return this for method chaining
+   */
+  DeleteOrphanFiles equivalentSchemes(List<String> equivalentSchemes);
+
+  /**
+   * Pass a list of authorities to be considered equivalent when finding 
orphan files

Review Comment:
   I spent more time thinking and couldn't come up with anything better than 
`unequalAuthorities`.
   Do you have any ideas, @karuppayya @flyrain @RussellSpitzer @szehon-ho 
@rdblue?



##########
api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java:
##########
@@ -80,6 +81,33 @@ public interface DeleteOrphanFiles extends 
Action<DeleteOrphanFiles, DeleteOrpha
    */
   DeleteOrphanFiles executeDeleteWith(ExecutorService executorService);
 
+  /**
+   * Pass a mode for handling the files that cannot be determined if they are 
orphan
+   * @param mode mode for handling files that cannot be determined if they are 
orphan
+   * @return this for method chaining
+   */
+  default DeleteOrphanFiles prefixMismatchMode(PrefixMisMatchMode mode) {
+    throw new UnsupportedOperationException(this.getClass().getName() + " does 
not implement prefixMismatchMode");
+  }
+
+  /**
+   * Pass a list of schemes to be considered equivalent when finding orphan 
files
+   * @param equivalentSchemes list of equivalent schemes
+   * @return this for method chaining
+   */
+  default DeleteOrphanFiles equivalentSchemes(List<String> equivalentSchemes) {

Review Comment:
   I think we should use `Map<String, String>` for scheme and authority 
equivalency.



##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction3.java:
##########
@@ -171,5 +171,4 @@ public void resetSparkSessionCatalog() throws Exception {
     spark.conf().unset("spark.sql.catalog.spark_catalog.type");
     spark.conf().unset("spark.sql.catalog.spark_catalog.warehouse");
   }
-

Review Comment:
   nit: unnecessary change



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to