RussellSpitzer commented on a change in pull request #1471:
URL: https://github.com/apache/iceberg/pull/1471#discussion_r490352331



##########
File path: 
spark/src/main/java/org/apache/iceberg/actions/RemoveOrphanFilesAction.java
##########
@@ -254,4 +270,53 @@ private static void listDirRecursively(
       return files.iterator();
     };
   }
+
+  protected static List<String> findOrphanFiles(
+      Dataset<Row> validFileDF,
+      Dataset<Row> actualFileDF) {
+    Column nameEqual = filename.apply(actualFileDF.col(RELATIVE_FILE_PATH))
+        .equalTo(filename.apply(validFileDF.col(RELATIVE_FILE_PATH)));
+
+    Column pathContains = actualFileDF.col(RELATIVE_FILE_PATH)
+        .contains(validFileDF.col(RELATIVE_FILE_PATH));
+
+    Column joinCond = nameEqual.and(pathContains);
+    Column decodeFilepath = decode.apply(actualFileDF.col(FILE_PATH));
+    return actualFileDF.join(validFileDF, joinCond, 
"leftanti").select(decodeFilepath)
+        .as(Encoders.STRING())
+        .collectAsList();
+  }
+
+  /**
+   * From
+   * <pre>{@code
+   * Dataset<Row<file_path_with_scheme_authority>>
+   *    will be transformed to
+   *    Dataset<Row<file_path_no_scheme_authority, 
file_path_with_scheme_authority>>
+   *  }</pre>
+   *
+   * This is required to compare the valid and all files to find the orphan 
files.
+   * Based on the result data set, only path will be compared while comparing 
valid and all files path.
+   * As in the case of hadoop, s3, there could be different authority names to 
access same path, which can give us files
+   * which are part of metadata and not orphan.
+   *
+   * @param filePathWithSchemeAndAuthority : complete file path, can include 
scheme, authority and path.
+   * @return : {@code file_path_no_scheme_authority, file_path}
+   */
+  protected static Dataset<Row> addRelativePathColumn(Dataset<Row> 
filePathWithSchemeAndAuthority) {
+    return filePathWithSchemeAndAuthority.withColumn(URI_DETAIL,
+        fileWithRelativePath.apply(
+            filePathWithSchemeAndAuthority.apply(FILE_PATH)
+        )).selectExpr(
+        URI_DETAIL + ". " + RELATIVE_FILE_PATH + " as "  + RELATIVE_FILE_PATH, 
// relative path

Review comment:
       extra space in here, I would suggest doing this as a String.format just 
to be safe

##########
File path: 
spark/src/main/java/org/apache/iceberg/actions/RemoveOrphanFilesAction.java
##########
@@ -79,6 +87,20 @@
     }
   }, DataTypes.StringType);
 
+  private static final UserDefinedFunction decode = functions.udf((String 
fullyQualifiedPath) -> {
+    return URLDecoder.decode(fullyQualifiedPath, "UTF-8");
+  }, DataTypes.StringType);
+
+  /**
+   * transform a file path to

Review comment:
       Nit: capitilaization 

##########
File path: 
spark/src/main/java/org/apache/iceberg/actions/RemoveOrphanFilesAction.java
##########
@@ -79,6 +87,20 @@
     }
   }, DataTypes.StringType);
 
+  private static final UserDefinedFunction decode = functions.udf((String 
fullyQualifiedPath) -> {
+    return URLDecoder.decode(fullyQualifiedPath, "UTF-8");
+  }, DataTypes.StringType);
+
+  /**
+   * transform a file path to
+   * {@code Dataset<Row<file_path_no_scheme_authority, 
file_path_with_scheme_authority>>}
+   */
+  private static final UserDefinedFunction fileWithRelativePath = 
functions.udf((String fullyQualifiedPath) -> {

Review comment:
       I don't want to bike shed to much here, but I think this name ends up 
being confusing because later we use it in conjunction with very similarly 
named variables which represent Dataframes and also use Apply methods. I would 
recommend we rename this to something with "UDF" in it so it's clear.

##########
File path: 
spark/src/main/java/org/apache/iceberg/actions/RemoveOrphanFilesAction.java
##########
@@ -254,4 +270,53 @@ private static void listDirRecursively(
       return files.iterator();
     };
   }
+
+  protected static List<String> findOrphanFiles(
+      Dataset<Row> validFileDF,
+      Dataset<Row> actualFileDF) {
+    Column nameEqual = filename.apply(actualFileDF.col(RELATIVE_FILE_PATH))
+        .equalTo(filename.apply(validFileDF.col(RELATIVE_FILE_PATH)));
+
+    Column pathContains = actualFileDF.col(RELATIVE_FILE_PATH)
+        .contains(validFileDF.col(RELATIVE_FILE_PATH));
+
+    Column joinCond = nameEqual.and(pathContains);
+    Column decodeFilepath = decode.apply(actualFileDF.col(FILE_PATH));
+    return actualFileDF.join(validFileDF, joinCond, 
"leftanti").select(decodeFilepath)
+        .as(Encoders.STRING())
+        .collectAsList();
+  }
+
+  /**
+   * From
+   * <pre>{@code
+   * Dataset<Row<file_path_with_scheme_authority>>
+   *    will be transformed to
+   *    Dataset<Row<file_path_no_scheme_authority, 
file_path_with_scheme_authority>>
+   *  }</pre>
+   *
+   * This is required to compare the valid and all files to find the orphan 
files.
+   * Based on the result data set, only path will be compared while comparing 
valid and all files path.
+   * As in the case of hadoop, s3, there could be different authority names to 
access same path, which can give us files
+   * which are part of metadata and not orphan.
+   *
+   * @param filePathWithSchemeAndAuthority : complete file path, can include 
scheme, authority and path.
+   * @return : {@code file_path_no_scheme_authority, file_path}
+   */
+  protected static Dataset<Row> addRelativePathColumn(Dataset<Row> 
filePathWithSchemeAndAuthority) {
+    return filePathWithSchemeAndAuthority.withColumn(URI_DETAIL,
+        fileWithRelativePath.apply(

Review comment:
       This is where the UDF naming looks confusing to me since we call apply 
on 2 objects which are very different and they have similar names, one is data 
one is code.

##########
File path: 
spark/src/test/java/org/apache/iceberg/actions/TestRemoveOrphanFilesAction.java
##########
@@ -528,6 +566,71 @@ public void testRemoveOrphanFilesWithRelativeFilePath() 
throws IOException, Inte
     Assert.assertTrue("Invalid file should be present", fs.exists(new 
Path(invalidFiles.get(0))));
   }
 
+  @Test
+  public void testRemoveOrphanFilesWithSpecialCharsFilePath() throws 
IOException, InterruptedException {
+    File whiteSpaceDir = null;
+    try {
+      whiteSpaceDir = new File(tableDir.getAbsolutePath() + "/white space");
+      whiteSpaceDir.mkdirs();
+
+      Table table = TABLES.create(
+          SCHEMA,
+          PartitionSpec.unpartitioned(),
+          Maps.newHashMap(),
+          tableDir.getAbsolutePath() + "/white space");
+
+      List<ThreeColumnRecord> records = Lists.newArrayList(
+          new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")
+      );
+
+      Dataset<Row> df = spark.createDataFrame(records, 
ThreeColumnRecord.class).coalesce(1);
+
+      df.select("c1", "c2", "c3")
+          .write()
+          .format("iceberg")
+          .mode("append")
+          .save(whiteSpaceDir.getAbsolutePath());
+
+      List<String> validFiles = spark.read().format("iceberg")
+          .load(whiteSpaceDir + "#files")
+          .select("file_path")
+          .as(Encoders.STRING())
+          .collectAsList();
+      Assert.assertEquals("Should be 1 valid files", 1, validFiles.size());
+      String validFile = validFiles.get(0);
+
+      df.write().mode("append").parquet(whiteSpaceDir + "/data");
+
+      Path dataPath = new Path(whiteSpaceDir + "/data");
+      FileSystem fs = 
dataPath.getFileSystem(spark.sessionState().newHadoopConf());
+      List<String> allFiles = Arrays.stream(fs.listStatus(dataPath, 
HiddenPathFilter.get()))
+          .filter(FileStatus::isFile)
+          .map(file -> file.getPath().toString())
+          .collect(Collectors.toList());
+      Assert.assertEquals("Should be 2 files", 2, allFiles.size());
+
+      List<String> invalidFiles = Lists.newArrayList(allFiles);
+      invalidFiles.removeIf(file -> file.contains(validFile));
+      Assert.assertEquals("Should be 1 invalid file", 1, invalidFiles.size());
+
+      // sleep for 1 second to unsure files will be old enough
+      Thread.sleep(1000);
+
+      Actions actions = Actions.forTable(table);
+      List<String> result = actions.removeOrphanFiles()
+          .olderThan(System.currentTimeMillis())
+          .execute();
+      Assert.assertEquals("Action should find 1 file", invalidFiles, result);

Review comment:
       Couldn't we check whether result matches invalid files?

##########
File path: 
spark/src/main/java/org/apache/iceberg/actions/RemoveOrphanFilesAction.java
##########
@@ -254,4 +270,53 @@ private static void listDirRecursively(
       return files.iterator();
     };
   }
+
+  protected static List<String> findOrphanFiles(
+      Dataset<Row> validFileDF,
+      Dataset<Row> actualFileDF) {
+    Column nameEqual = filename.apply(actualFileDF.col(RELATIVE_FILE_PATH))
+        .equalTo(filename.apply(validFileDF.col(RELATIVE_FILE_PATH)));
+
+    Column pathContains = actualFileDF.col(RELATIVE_FILE_PATH)
+        .contains(validFileDF.col(RELATIVE_FILE_PATH));
+
+    Column joinCond = nameEqual.and(pathContains);

Review comment:
       Isn't PathContains always true if nameEqual? I feel like you just need 
nameEqual Here

##########
File path: 
spark/src/test/java/org/apache/iceberg/actions/TestRemoveOrphanFilesAction.java
##########
@@ -528,6 +566,71 @@ public void testRemoveOrphanFilesWithRelativeFilePath() 
throws IOException, Inte
     Assert.assertTrue("Invalid file should be present", fs.exists(new 
Path(invalidFiles.get(0))));
   }
 
+  @Test
+  public void testRemoveOrphanFilesWithSpecialCharsFilePath() throws 
IOException, InterruptedException {
+    File whiteSpaceDir = null;
+    try {
+      whiteSpaceDir = new File(tableDir.getAbsolutePath() + "/white space");

Review comment:
       Not sure we need to be careful with cleanup here, couldn't we just use 
temp.newDir and then skip the cleanup?

##########
File path: 
spark/src/test/java/org/apache/iceberg/actions/TestRemoveOrphanFilesAction.java
##########
@@ -569,4 +672,349 @@ public void testRemoveOrphanFilesWithHadoopCatalog() 
throws InterruptedException
         .collectAsList();
     Assert.assertEquals("Rows must match", records, actualRecords);
   }
+
+  @Test
+  public void testFindOrphanFilesWithSameAuthority() throws Exception {
+    List<Row> validFilesData = 
getRowsWithFilePath(HDFS_SERVICENAME_USER_LOG_DATA_DUMMY_FILE, 3);
+
+    Dataset<Row> validFileDS = RemoveOrphanFilesAction
+        .addRelativePathColumn(
+            spark.createDataset(validFilesData, 
RowEncoder.apply(constructStructureWithString())));
+
+    List<Row> actualFilesData = new ArrayList<>();
+
+    actualFilesData.addAll(validFilesData);
+    List<Row> expectedOrphanFiles = 
getRowsWithFilePath(HDFS_SERVICENAME_USER_LOG_DATA_ORPHAN_FILE, 4);
+    actualFilesData.addAll(expectedOrphanFiles);
+
+    Dataset<Row> actualFileDS = RemoveOrphanFilesAction
+        .addRelativePathColumn(
+            spark.createDataset(actualFilesData, 
RowEncoder.apply(constructStructureWithString())));
+
+    List<String> orphanFiles = 
RemoveOrphanFilesAction.findOrphanFiles(validFileDS, actualFileDS);
+
+    Assert.assertNotNull(orphanFiles);
+
+    Assert.assertEquals(expectedOrphanFiles.stream().map(row -> 
row.get(0).toString())
+        .collect(Collectors.toList()), orphanFiles);
+  }
+
+  @Test
+  public void testFindOrphanFilesWithValidFileHasNoAuthority() throws 
Exception {
+    List<Row> validFilesData = 
getRowsWithFilePath(HDFS_USER_LOG_DATA_DUMMY_FILE, 4);
+    Dataset<Row> validFileDS = RemoveOrphanFilesAction
+        .addRelativePathColumn(
+            spark.createDataset(validFilesData, 
RowEncoder.apply(constructStructureWithString())));
+
+    List<Row> actualFilesData = new ArrayList<>();
+
+    
actualFilesData.addAll(getRowsWithFilePath(HDFS_SERVICENAME_USER_LOG_DATA_DUMMY_FILE,
 4));
+    List<Row> expectedOrphanFiles = 
getRowsWithFilePath(HDFS_SERVICENAME_USER_LOG_DATA_ORPHAN_FILE, 4);
+    actualFilesData.addAll(expectedOrphanFiles);
+
+    Dataset<Row> actualFileDS = RemoveOrphanFilesAction
+        .addRelativePathColumn(
+            spark.createDataset(actualFilesData, 
RowEncoder.apply(constructStructureWithString())));
+
+    List<String> orphanFiles = 
RemoveOrphanFilesAction.findOrphanFiles(validFileDS, actualFileDS);
+
+    Assert.assertEquals(expectedOrphanFiles.stream().map(row -> 
row.get(0).toString())
+        .collect(Collectors.toList()), orphanFiles);
+  }
+
+  @Test

Review comment:
       My gut tells me we should be able to parameterize all of tests around 
the FileName so we can remove a bit of duplicate code here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to