aokolnychyi commented on code in PR #4652:
URL: https://github.com/apache/iceberg/pull/4652#discussion_r930431757


##########
spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java:
##########
@@ -400,4 +409,74 @@ public void testRemoveOrphanFilesWithDeleteFiles() throws 
Exception {
         .collectAsList();
     Assert.assertEquals("Rows must match", records, actualRecords);
   }
+
+  @Test
+  public void testRemoveOrphanFilesProcedureWithPrefixMode() throws 
NoSuchTableException, ParseException, IOException {
+    if (catalogName.equals("testhadoop")) {
+      sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", 
tableName);
+    } else {
+      sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg 
LOCATION '%s'", tableName,
+          temp.newFolder().toURI().toString());
+    }
+    Table table = Spark3Util.loadIcebergTable(spark, tableName);
+    String location = table.location();
+    Path originalPath = new Path(location);
+
+    URI uri = originalPath.toUri();
+    Path newParentPath = new Path("file1", uri.getAuthority(), uri.getPath());
+
+    DataFile dataFile1 = DataFiles.builder(PartitionSpec.unpartitioned())
+            .withPath(new Path(newParentPath, 
"path/to/data-a.parquet").toString())

Review Comment:
   nit: I feel the code in the added test is formatted with 4/8 spaces instead 
of 2/4



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java:
##########
@@ -94,25 +102,21 @@
     extends BaseSparkAction<DeleteOrphanFilesSparkAction> implements 
DeleteOrphanFiles {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(DeleteOrphanFilesSparkAction.class);
-  private static final UserDefinedFunction filenameUDF = functions.udf((String 
path) -> {
-    int lastIndex = path.lastIndexOf(File.separator);
-    if (lastIndex == -1) {
-      return path;
-    } else {
-      return path.substring(lastIndex + 1);
-    }
-  }, DataTypes.StringType);
-
   private final SerializableConfiguration hadoopConf;
   private final int partitionDiscoveryParallelism;
   private final Table table;
+  private static final Splitter COMMA = Splitter.on(",");
+  private static final Map<String, String> EQUAL_SCHEMES_DEFAULT = 
ImmutableMap.of("s3n,s3a", "s3");
+
   private final Consumer<String> defaultDelete = new Consumer<String>() {
     @Override
     public void accept(String file) {
       table.io().deleteFile(file);
     }
   };
-
+  private Map<String, String> equalSchemes = Collections.emptyMap();

Review Comment:
   Should it be `flattenMap(EQUAL_SCHEMES_DEFAULT)` if the user does not set 
equal schemes?



##########
api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java:
##########
@@ -80,6 +81,53 @@ public interface DeleteOrphanFiles extends 
Action<DeleteOrphanFiles, DeleteOrpha
    */
   DeleteOrphanFiles executeDeleteWith(ExecutorService executorService);
 
+  /**
+   * Passes a prefix mismatch mode that determines how this action should 
handle situations when
+   * the metadata references files that match listed/provided files except for 
authority/scheme.
+   * <p>
+   * Possible values are "ERROR", "IGNORE", "DELETE". The default mismatch 
mode is "ERROR",
+   * which means an exception is thrown whenever there is a mismatch in 
authority/scheme.
+   * It's the recommended mismatch mode and should be changed only in some 
rare circumstances.
+   * If there is a mismatch, use {@link #newEqualSchemes(Map)} (Map)} and 
{@link #newEqualAuthorities(Map)} (Map)}

Review Comment:
   Looks like this was missed.



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java:
##########
@@ -121,6 +148,12 @@ public InternalRow[] call(InternalRow args) {
         action.compareToFileList(spark().table(fileListView));
       }
 
+      action.equalSchemes(equalSchemes);
+      action.equalAuthorities(equalAuthorities);
+      if (prefixMismatchMode != null) {

Review Comment:
   nit: let's add an empty line before the if statement to match all other if 
statements above



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java:
##########
@@ -94,18 +101,15 @@
     extends BaseSparkAction<DeleteOrphanFilesSparkAction> implements 
DeleteOrphanFiles {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(DeleteOrphanFilesSparkAction.class);
-  private static final UserDefinedFunction filenameUDF = functions.udf((String 
path) -> {
-    int lastIndex = path.lastIndexOf(File.separator);
-    if (lastIndex == -1) {
-      return path;
-    } else {
-      return path.substring(lastIndex + 1);
-    }
-  }, DataTypes.StringType);
-
   private final SerializableConfiguration hadoopConf;
   private final int partitionDiscoveryParallelism;
   private final Table table;
+  private static final Splitter COMMA = Splitter.on(",");
+  private static final Map<String, String> EQUAL_SCHEMES_DEFAULT = 
ImmutableMap.of("s3n,s3a", "s3");
+
+  private Map<String, String> equalSchemes = ImmutableMap.of();

Review Comment:
   What if the user does not set equal schemes?



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java:
##########
@@ -120,6 +147,13 @@ public InternalRow[] call(InternalRow args) {
       if (fileListView != null) {
         action.compareToFileList(spark().table(fileListView));
       }
+      Preconditions.checkArgument(prefixMismatchMode == null ||

Review Comment:
   This will force the user to match the upper case expected by `valueOf`. The 
solution above is a bit more flexible. We do that in many other enums.



##########
api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java:
##########
@@ -80,6 +81,53 @@ public interface DeleteOrphanFiles extends 
Action<DeleteOrphanFiles, DeleteOrpha
    */
   DeleteOrphanFiles executeDeleteWith(ExecutorService executorService);
 
+  /**
+   * Passes a prefix mismatch mode that determines how this action should 
handle situations when
+   * the metadata references files that match listed/provided files except for 
authority/scheme.
+   * <p>
+   * Possible values are "ERROR", "IGNORE", "DELETE". The default mismatch 
mode is "ERROR",
+   * which means an exception is thrown whenever there is a mismatch in 
authority/scheme.
+   * It's the recommended mismatch mode and should be changed only in some 
rare circumstances.
+   * If there is a mismatch, use {@link #equalSchemes(Map)} (Map)} and {@link 
#equalAuthorities(Map)} (Map)}

Review Comment:
   nit: broken Javadoc



##########
spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java:
##########
@@ -400,4 +409,74 @@ public void testRemoveOrphanFilesWithDeleteFiles() throws 
Exception {
         .collectAsList();
     Assert.assertEquals("Rows must match", records, actualRecords);
   }
+
+  @Test
+  public void testRemoveOrphanFilesProcedureWithPrefixMode() throws 
NoSuchTableException, ParseException, IOException {
+    if (catalogName.equals("testhadoop")) {
+      sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", 
tableName);
+    } else {
+      sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg 
LOCATION '%s'", tableName,
+          temp.newFolder().toURI().toString());
+    }
+    Table table = Spark3Util.loadIcebergTable(spark, tableName);
+    String location = table.location();
+    Path originalPath = new Path(location);
+
+    URI uri = originalPath.toUri();
+    Path newParentPath = new Path("file1", uri.getAuthority(), uri.getPath());
+
+    DataFile dataFile1 = DataFiles.builder(PartitionSpec.unpartitioned())
+            .withPath(new Path(newParentPath, 
"path/to/data-a.parquet").toString())
+            .withFileSizeInBytes(10)
+            .withRecordCount(1)
+            .build();
+    DataFile dataFile2 = DataFiles.builder(PartitionSpec.unpartitioned())
+            .withPath(new Path(newParentPath, 
"path/to/data-b.parquet").toString())
+            .withFileSizeInBytes(10)
+            .withRecordCount(1)
+            .build();
+
+    table.newFastAppend()
+            .appendFile(dataFile1)
+            .appendFile(dataFile2)
+            .commit();
+
+    List<FilePathLastModifiedRecord> allFiles = Lists.newArrayList(

Review Comment:
   It is a little bit hard to read. I think you can simplify it by using for 
loops and slightly different formatting.
   
   ```
   List<FilePathLastModifiedRecord> allFiles = Lists.newArrayList(
       new FilePathLastModifiedRecord(
           new Path(originalPath, "path/to/data-a.parquet").toString(),
           LAST_MODIFIED_TIMESTAMP),
       new FilePathLastModifiedRecord(
           new Path(originalPath, "path/to/data-b.parquet").toString(),
           LAST_MODIFIED_TIMESTAMP),
       new FilePathLastModifiedRecord(
           ReachableFileUtil.versionHintLocation(table),
           LAST_MODIFIED_TIMESTAMP));
   
   for (String file : ReachableFileUtil.metadataFileLocations(table, true)) {
     allFiles.add(new FilePathLastModifiedRecord(file, 
LAST_MODIFIED_TIMESTAMP));
   }
   
   for (ManifestFile manifest : TestHelpers.dataManifests(table)) {
     allFiles.add(new FilePathLastModifiedRecord(manifest.path(), 
LAST_MODIFIED_TIMESTAMP));
   }
   ```



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java:
##########
@@ -326,6 +345,99 @@ private static FlatMapFunction<Iterator<String>, String> 
listDirsRecursively(
     };
   }
 
+  @VisibleForTesting
+  static List<String> findOrphanFiles(
+      SparkSession spark, Dataset<Row> actualFileDF, Dataset<Row> validFileDF,
+      Map<String, String> equalSchemes, Map<String, String> equalAuthorities,
+      PrefixMismatchMode prefixMismatchMode) {
+    Dataset<FileMetadata> actualFileMetadataDS = actualFileDF.mapPartitions(
+        toFileMetadata(equalSchemes, equalAuthorities),
+        Encoders.bean(FileMetadata.class));
+    Dataset<FileMetadata> validFileMetadataDS = validFileDF.mapPartitions(
+        toFileMetadata(equalSchemes, equalAuthorities),
+        Encoders.bean(FileMetadata.class));
+
+    SetAccumulator<Pair<String, String>> conflicts = new SetAccumulator<>();
+    spark.sparkContext().register(conflicts);
+
+    Column joinCond = 
actualFileMetadataDS.col("path").equalTo(validFileMetadataDS.col("path"));
+
+    List<String> orphanFiles = 
actualFileMetadataDS.joinWith(validFileMetadataDS, joinCond, "leftouter")
+        .mapPartitions(findOrphanFiles(prefixMismatchMode, conflicts), 
Encoders.STRING())
+        .collectAsList();
+
+    if (prefixMismatchMode == PrefixMismatchMode.ERROR && 
!conflicts.value().isEmpty()) {
+      throw new ValidationException("Unable to determine whether certain files 
are orphan. " +
+          "Metadata references files that match listed/provided files except 
for authority/scheme. " +
+          "Please, inspect the conflicting authorities/schemes and provide 
which of them are equal " +
+          "by further configuring the action via equalSchemes() and 
equalAuthorities() methods. " +
+          "Set the prefix mismatch mode to 'NONE' to ignore remaining 
locations with conflicting " +
+          "authorities/schemes or to 'DELETE' iff you are ABSOLUTELY confident 
that remaining conflicting " +
+          "authorities/schemes are different. It will be impossible to recover 
deleted files. " +
+          "Conflicting authorities/schemes: %s.", conflicts.value());
+    }
+    return orphanFiles;
+  }
+
+  private static Map<String, String> flattenMap(Map<String, String> map) {
+    Map<String, String> flattenedMap = Maps.newHashMap();
+    if (map != null) {
+      for (String key : map.keySet()) {
+        String value = map.get(key);
+        for (String splitKey : COMMA.split(key)) {
+          flattenedMap.put(splitKey.trim(), value.trim());
+        }
+      }
+    }
+    return flattenedMap;
+  }
+
+  private static MapPartitionsFunction<Tuple2<FileMetadata, FileMetadata>, 
String> findOrphanFiles(
+      PrefixMismatchMode mode,
+      SetAccumulator<Pair<String, String>> conflicts) {
+    return rows -> {
+      Iterator<String> transformed = Iterators.transform(rows, row -> {
+        FileMetadata actual = row._1;
+        FileMetadata valid = row._2;
+
+        if (valid == null) {
+          return actual.location;
+        }
+
+        boolean schemeMatch = Strings.isNullOrEmpty(valid.scheme) ||
+            valid.scheme.equalsIgnoreCase(actual.scheme);
+        boolean authorityMatch = Strings.isNullOrEmpty(valid.authority) ||
+            valid.authority.equalsIgnoreCase(actual.authority);
+
+        if ((!schemeMatch || !authorityMatch) && mode == 
PrefixMismatchMode.DELETE) {
+          return actual.location;
+        } else {
+          if (!schemeMatch) {
+            conflicts.add(Pair.of(valid.scheme, actual.scheme));
+          }
+          if (!authorityMatch) {
+            conflicts.add(Pair.of(valid.authority, actual.authority));
+          }
+        }
+
+        return null;
+      });
+      return Iterators.filter(transformed, Objects::nonNull);
+    };
+  }
+
+  private static MapPartitionsFunction<Row, FileMetadata> toFileMetadata(
+      Map<String, String> equalSchemesMap, Map<String, String> 
equalAuthoritiesMap) {
+    return rows -> Iterators.transform(rows, row -> {
+      String location = row.getString(0);
+      URI uri = new Path(location).toUri();
+      String scheme = equalSchemesMap.getOrDefault(uri.getScheme(), 
uri.getScheme());
+      String authority = equalAuthoritiesMap.getOrDefault(uri.getAuthority(), 
uri.getAuthority());
+      return new FileMetadata(scheme, authority, uri.getPath(), location);
+    });
+  }
+
+

Review Comment:
   nit: extra empty line



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java:
##########
@@ -326,6 +345,99 @@ private static FlatMapFunction<Iterator<String>, String> 
listDirsRecursively(
     };
   }
 
+  @VisibleForTesting
+  static List<String> findOrphanFiles(
+      SparkSession spark, Dataset<Row> actualFileDF, Dataset<Row> validFileDF,
+      Map<String, String> equalSchemes, Map<String, String> equalAuthorities,
+      PrefixMismatchMode prefixMismatchMode) {
+    Dataset<FileMetadata> actualFileMetadataDS = actualFileDF.mapPartitions(
+        toFileMetadata(equalSchemes, equalAuthorities),
+        Encoders.bean(FileMetadata.class));
+    Dataset<FileMetadata> validFileMetadataDS = validFileDF.mapPartitions(
+        toFileMetadata(equalSchemes, equalAuthorities),
+        Encoders.bean(FileMetadata.class));
+
+    SetAccumulator<Pair<String, String>> conflicts = new SetAccumulator<>();
+    spark.sparkContext().register(conflicts);
+
+    Column joinCond = 
actualFileMetadataDS.col("path").equalTo(validFileMetadataDS.col("path"));
+
+    List<String> orphanFiles = 
actualFileMetadataDS.joinWith(validFileMetadataDS, joinCond, "leftouter")
+        .mapPartitions(findOrphanFiles(prefixMismatchMode, conflicts), 
Encoders.STRING())
+        .collectAsList();
+
+    if (prefixMismatchMode == PrefixMismatchMode.ERROR && 
!conflicts.value().isEmpty()) {
+      throw new ValidationException("Unable to determine whether certain files 
are orphan. " +
+          "Metadata references files that match listed/provided files except 
for authority/scheme. " +
+          "Please, inspect the conflicting authorities/schemes and provide 
which of them are equal " +
+          "by further configuring the action via equalSchemes() and 
equalAuthorities() methods. " +
+          "Set the prefix mismatch mode to 'NONE' to ignore remaining 
locations with conflicting " +
+          "authorities/schemes or to 'DELETE' iff you are ABSOLUTELY confident 
that remaining conflicting " +
+          "authorities/schemes are different. It will be impossible to recover 
deleted files. " +
+          "Conflicting authorities/schemes: %s.", conflicts.value());
+    }
+    return orphanFiles;

Review Comment:
   nit: let's add an empty line before the return statement to separate the if 
block



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java:
##########
@@ -94,6 +100,27 @@ public InternalRow[] call(InternalRow args) {
     Preconditions.checkArgument(maxConcurrentDeletes == null || 
maxConcurrentDeletes > 0,
             "max_concurrent_deletes should have value > 0,  value: " + 
maxConcurrentDeletes);
 
+    Map<String, String> equalSchemes = Maps.newHashMap();
+    if (!args.isNullAt(6)) {
+      args.getMap(6).foreach(DataTypes.StringType, DataTypes.StringType,
+          (k, v) -> {
+            equalSchemes.put(k.toString(), v.toString());
+            return BoxedUnit.UNIT;
+          });
+    }
+
+    Map<String, String> equalAuthorities = Maps.newHashMap();
+    if (!args.isNullAt(7)) {
+      args.getMap(7).foreach(DataTypes.StringType, DataTypes.StringType,
+          (k, v) -> {
+            equalSchemes.put(k.toString(), v.toString());
+            return BoxedUnit.UNIT;
+          });
+    }
+
+    DeleteOrphanFiles.PrefixMismatchMode prefixMismatchMode = args.isNullAt(8) 
? null :

Review Comment:
   nit: I'd have a direct import for `PrefixMismatchMode` to shorten the line 
and `fromString` to the enum like discussed 
[here](https://github.com/apache/iceberg/pull/4652/#discussion_r927171675).



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java:
##########
@@ -94,18 +101,15 @@
     extends BaseSparkAction<DeleteOrphanFilesSparkAction> implements 
DeleteOrphanFiles {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(DeleteOrphanFilesSparkAction.class);
-  private static final UserDefinedFunction filenameUDF = functions.udf((String 
path) -> {
-    int lastIndex = path.lastIndexOf(File.separator);
-    if (lastIndex == -1) {
-      return path;
-    } else {
-      return path.substring(lastIndex + 1);
-    }
-  }, DataTypes.StringType);
-
   private final SerializableConfiguration hadoopConf;
   private final int partitionDiscoveryParallelism;
   private final Table table;
+  private static final Splitter COMMA = Splitter.on(",");

Review Comment:
   I think this still remains open as we have LOG (a static var) followed by 
final instance vars and then static vars again. We should group `LOG`, `COMMA`, 
`EQUAL_SCHEMES_DEFAULT` and then add an empty like before final instance vars 
like in the formatting above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to