aokolnychyi commented on code in PR #4652:
URL: https://github.com/apache/iceberg/pull/4652#discussion_r866297319


##########
api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java:
##########
@@ -80,6 +81,28 @@ public interface DeleteOrphanFiles extends 
Action<DeleteOrphanFiles, DeleteOrpha
    */
   DeleteOrphanFiles executeDeleteWith(ExecutorService executorService);
 
+

Review Comment:
   nit: extra empty line



##########
spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java:
##########
@@ -78,22 +84,16 @@
     extends BaseSparkAction<DeleteOrphanFiles, DeleteOrphanFiles.Result> 
implements DeleteOrphanFiles {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(BaseDeleteOrphanFilesSparkAction.class);
-  private static final UserDefinedFunction filenameUDF = functions.udf((String 
path) -> {
-    int lastIndex = path.lastIndexOf(File.separator);
-    if (lastIndex == -1) {
-      return path;
-    } else {
-      return path.substring(lastIndex + 1);
-    }
-  }, DataTypes.StringType);
-
   private static final ExecutorService DEFAULT_DELETE_EXECUTOR_SERVICE = null;
 
   private final SerializableConfiguration hadoopConf;
   private final int partitionDiscoveryParallelism;
   private final Table table;
 
   private String location = null;
+  private PrefixMisMatchMode prefixMismatchMode = PrefixMisMatchMode.IGNORE;

Review Comment:
   I think the default mode should be error to bring the attention to the 
problem.



##########
spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java:
##########
@@ -128,6 +128,21 @@ public BaseDeleteOrphanFilesSparkAction 
executeDeleteWith(ExecutorService execut
     return this;
   }
 
+  @Override
+  public DeleteOrphanFiles prefixMismatchMode(String mode) {

Review Comment:
   If you add default implementations, you may avoid touching all Spark 
versions.



##########
api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java:
##########
@@ -80,6 +81,28 @@ public interface DeleteOrphanFiles extends 
Action<DeleteOrphanFiles, DeleteOrpha
    */
   DeleteOrphanFiles executeDeleteWith(ExecutorService executorService);
 
+
+  /**
+   * Pass a mode for handling the files that cannot be determined if they are 
orphan
+   * @param mode mode for handling files that cannot be determined if they are 
orphan
+   * @return this for method chaining
+   */
+  DeleteOrphanFiles prefixMismatchMode(String mode);

Review Comment:
   I think you can move the enum from the Spark module here and make it nested 
within this action interface (e.g. right above the result interface). Then we 
can refer to it in this method instead of using a string.



##########
api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java:
##########
@@ -80,6 +81,28 @@ public interface DeleteOrphanFiles extends 
Action<DeleteOrphanFiles, DeleteOrpha
    */
   DeleteOrphanFiles executeDeleteWith(ExecutorService executorService);
 
+
+  /**
+   * Pass a mode for handling the files that cannot be determined if they are 
orphan
+   * @param mode mode for handling files that cannot be determined if they are 
orphan
+   * @return this for method chaining
+   */
+  DeleteOrphanFiles prefixMismatchMode(String mode);
+
+  /**
+   * Pass a list of schemes to be considered equivalent when finding orphan 
files
+   * @param equivalentSchemes list of equivalent schemes
+   * @return this for method chaining
+   */
+  DeleteOrphanFiles equivalentSchemes(List<String> equivalentSchemes);

Review Comment:
   All newly added methods should have default implementations throwing an 
exception.
   
   ```
   throw new UnsupportedOperationException(this.getClass().getName() + " does 
not implement XXX");
   ```



##########
api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java:
##########
@@ -80,6 +81,28 @@ public interface DeleteOrphanFiles extends 
Action<DeleteOrphanFiles, DeleteOrpha
    */
   DeleteOrphanFiles executeDeleteWith(ExecutorService executorService);
 
+
+  /**
+   * Pass a mode for handling the files that cannot be determined if they are 
orphan
+   * @param mode mode for handling files that cannot be determined if they are 
orphan
+   * @return this for method chaining
+   */
+  DeleteOrphanFiles prefixMismatchMode(String mode);
+
+  /**
+   * Pass a list of schemes to be considered equivalent when finding orphan 
files
+   * @param equivalentSchemes list of equivalent schemes
+   * @return this for method chaining
+   */
+  DeleteOrphanFiles equivalentSchemes(List<String> equivalentSchemes);
+
+  /**
+   * Pass a list of authorities to be considered equivalent when finding 
orphan files

Review Comment:
   Question: what if I have the following use case where `bucket1` and 
`bucket2` are indeed different?
   
   ```
   s3://bucket1/path/to/file.parquet - actual
   s3://bucket2/path/to/file.parquet - referenced by metadata
   ```
   
   The first file should be deleted. How can we support this case with this 
API? The only way I can think of right now is to add another method of unequal 
authorities. That seems to complicate the action quite a bit.
   
   Any better ideas?



##########
api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java:
##########
@@ -80,6 +81,28 @@ public interface DeleteOrphanFiles extends 
Action<DeleteOrphanFiles, DeleteOrpha
    */
   DeleteOrphanFiles executeDeleteWith(ExecutorService executorService);
 
+
+  /**
+   * Pass a mode for handling the files that cannot be determined if they are 
orphan
+   * @param mode mode for handling files that cannot be determined if they are 
orphan
+   * @return this for method chaining
+   */
+  DeleteOrphanFiles prefixMismatchMode(String mode);
+
+  /**
+   * Pass a list of schemes to be considered equivalent when finding orphan 
files
+   * @param equivalentSchemes list of equivalent schemes
+   * @return this for method chaining
+   */
+  DeleteOrphanFiles equivalentSchemes(List<String> equivalentSchemes);
+
+  /**
+   * Pass a list of authorities to be considered equivalent when finding 
orphan files
+   * @param equivalentAuthorities list of equivalent schemes
+   * @return this for method chaining
+   */
+  DeleteOrphanFiles equivalentAuthorities(List<String> equivalentAuthorities);

Review Comment:
   Question: will it be better to use a map? We recently added a way to specify 
a list of actual files so it may contain entries for multiple 
schemes/authorities.



##########
spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java:
##########
@@ -78,22 +84,16 @@
     extends BaseSparkAction<DeleteOrphanFiles, DeleteOrphanFiles.Result> 
implements DeleteOrphanFiles {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(BaseDeleteOrphanFilesSparkAction.class);
-  private static final UserDefinedFunction filenameUDF = functions.udf((String 
path) -> {
-    int lastIndex = path.lastIndexOf(File.separator);
-    if (lastIndex == -1) {
-      return path;
-    } else {
-      return path.substring(lastIndex + 1);
-    }
-  }, DataTypes.StringType);
-
   private static final ExecutorService DEFAULT_DELETE_EXECUTOR_SERVICE = null;
 
   private final SerializableConfiguration hadoopConf;
   private final int partitionDiscoveryParallelism;
   private final Table table;
 
   private String location = null;
+  private PrefixMisMatchMode prefixMismatchMode = PrefixMisMatchMode.IGNORE;
+  private List<String> equivalentSchemes;

Review Comment:
   We also need to define some default values like s3a, s3, s3n



##########
spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java:
##########
@@ -273,4 +310,102 @@ private static FlatMapFunction<Iterator<String>, String> 
listDirsRecursively(
       return files.iterator();
     };
   }
+
+  private static Dataset<Row> sanitizeAndSplitPaths(Dataset<Row> filesDF) {
+    final StructType schema = new StructType(new StructField[]{

Review Comment:
   Seems like this can be a static constant?



##########
spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java:
##########
@@ -186,6 +202,27 @@ private DeleteOrphanFiles.Result doExecute() {
     return new BaseDeleteOrphanFilesActionResult(orphanFiles);
   }
 
+  static Dataset<String> getOrphanFilesDF(Dataset<Row> actualFileDF,

Review Comment:
   nit: annotate with `@VisibleForTesting`?



##########
spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java:
##########
@@ -186,6 +202,27 @@ private DeleteOrphanFiles.Result doExecute() {
     return new BaseDeleteOrphanFilesActionResult(orphanFiles);
   }
 
+  static Dataset<String> getOrphanFilesDF(Dataset<Row> actualFileDF,

Review Comment:
   We usually try to avoid using `get` as it has almost to meaning. What about 
`findOrphanFiles` and returning a list of strings?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to