karuppayya commented on code in PR #4652:
URL: https://github.com/apache/iceberg/pull/4652#discussion_r892770784


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java:
##########
@@ -361,4 +403,90 @@ static PathFilter forSpecs(Map<Integer, PartitionSpec> 
specs) {
       return partitionNames.isEmpty() ? HiddenPathFilter.get() : new 
PartitionAwareHiddenPathFilter(partitionNames);
     }
   }
+
+  private static Dataset<Row> sanitizeAndSplitPaths(Dataset<Row> filesDF) {
+    return filesDF.mapPartitions((MapPartitionsFunction<Row, Row>) input ->
+        Iterators.transform(input, row -> {
+          String pathString = row.getString(0);
+          Path path = new Path(pathString);
+          URI uri = path.toUri();
+          List<Object> values = Lists.newArrayList(uri.getScheme(), 
uri.getAuthority(), uri.getPath(), pathString);
+          return 
Row$.MODULE$.apply(scala.collection.JavaConverters.asScalaBuffer(values).toSeq());
+        }), RowEncoder.apply(fileSchema));
+  }
+  static class MapOrphanFilesFunction implements MapPartitionsFunction<Row, 
String> {
+
+    private final List<String> equivalentSchemes;
+    private final List<String> equivalentAuthorities;
+    private final PrefixMisMatchMode mismatchMode;
+    private final StructType scheme;
+
+    MapOrphanFilesFunction(List<String> equivalentSchemes,
+                           List<String> equivalentAuthorities,
+                           PrefixMisMatchMode mismatchMode,
+                           StructType schema) {
+      this.equivalentSchemes = equivalentSchemes;
+      this.equivalentAuthorities = equivalentAuthorities;
+      this.mismatchMode = mismatchMode;
+      this.scheme = schema;
+    }
+
+    @Override
+    public Iterator<String> call(Iterator<Row> value) throws Exception {
+
+      Iterator<String> orphanFilesIterator = Iterators.transform(value, row -> 
{
+        if (isOrphan(row)) {
+          return row.getString(3);
+        } else {
+          return null;
+        }
+      });
+      return Iterators.filter(orphanFilesIterator, StringUtils::isNotBlank);
+    }
+
+    boolean isOrphan(Row row) {
+      String[] fields = scheme.fieldNames();
+
+      // actual file related fields
+      assert (fields[0].equalsIgnoreCase("scheme"));
+      String actualScheme = row.getString(0);
+      assert (fields[1].equalsIgnoreCase("authority"));
+      String actualAuthority = row.getString(1);
+      assert (fields[3].equalsIgnoreCase("file_path"));
+      String actualFullPath = row.getString(3);
+
+      // valid files related fields
+      assert (fields[4].equalsIgnoreCase("scheme"));
+      String validScheme = row.getString(4);
+      assert (fields[5].equalsIgnoreCase("authority"));
+      String validAuthority = row.getString(5);
+      assert (fields[6].equalsIgnoreCase("path"));
+      String validPath = row.getString(6);
+      assert (fields[7].equalsIgnoreCase("file_path"));
+      String validFullPath = row.getString(7);
+      if (validPath == null) {
+        return true;
+      }
+      boolean isEquivalent = isEquivalent(equivalentSchemes, actualScheme, 
validScheme) &&
+          isEquivalent(equivalentAuthorities, actualAuthority, validAuthority);
+      if (!isEquivalent && mismatchMode == PrefixMisMatchMode.ERROR) {
+        throw new ValidationException("file path from listing: %s, file path 
from metadata: %s ," +
+            " Unable to determine if the file is an orphan. There is a 
mismatch in the scheme and/or authority" +
+            " of the file.", actualFullPath, validFullPath);
+      }
+      return false;
+    }
+
+    boolean isEquivalent(List<String> equivalents, String actual, String 
valid) {
+      boolean equivalent = StringUtils.isBlank(valid);

Review Comment:
   @rdblue When writing to metadata file, the authority can be empty. This is 
my understanding. Let me know otherwise



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to