RussellSpitzer commented on a change in pull request #1471:
URL: https://github.com/apache/iceberg/pull/1471#discussion_r490352331
##########
File path:
spark/src/main/java/org/apache/iceberg/actions/RemoveOrphanFilesAction.java
##########
@@ -254,4 +270,53 @@ private static void listDirRecursively(
return files.iterator();
};
}
+
+ protected static List<String> findOrphanFiles(
+ Dataset<Row> validFileDF,
+ Dataset<Row> actualFileDF) {
+ Column nameEqual = filename.apply(actualFileDF.col(RELATIVE_FILE_PATH))
+ .equalTo(filename.apply(validFileDF.col(RELATIVE_FILE_PATH)));
+
+ Column pathContains = actualFileDF.col(RELATIVE_FILE_PATH)
+ .contains(validFileDF.col(RELATIVE_FILE_PATH));
+
+ Column joinCond = nameEqual.and(pathContains);
+ Column decodeFilepath = decode.apply(actualFileDF.col(FILE_PATH));
+ return actualFileDF.join(validFileDF, joinCond,
"leftanti").select(decodeFilepath)
+ .as(Encoders.STRING())
+ .collectAsList();
+ }
+
+ /**
+ * From
+ * <pre>{@code
+ * Dataset<Row<file_path_with_scheme_authority>>
+ * will be transformed to
+ * Dataset<Row<file_path_no_scheme_authority,
file_path_with_scheme_authority>>
+ * }</pre>
+ *
+ * This is required to compare the valid and all files to find the orphan
files.
+ * Based on the result data set, only path will be compared while comparing
valid and all files path.
+ * As in the case of hadoop, s3, there could be different authority names to
access same path, which can give us files
+ * which are part of metadata and not orphan.
+ *
+ * @param filePathWithSchemeAndAuthority : complete file path, can include
scheme, authority and path.
+ * @return : {@code file_path_no_scheme_authority, file_path}
+ */
+ protected static Dataset<Row> addRelativePathColumn(Dataset<Row>
filePathWithSchemeAndAuthority) {
+ return filePathWithSchemeAndAuthority.withColumn(URI_DETAIL,
+ fileWithRelativePath.apply(
+ filePathWithSchemeAndAuthority.apply(FILE_PATH)
+ )).selectExpr(
+ URI_DETAIL + ". " + RELATIVE_FILE_PATH + " as " + RELATIVE_FILE_PATH,
// relative path
Review comment:
extra space in here, I would suggest doing this as a String.format just
to be safe
##########
File path:
spark/src/main/java/org/apache/iceberg/actions/RemoveOrphanFilesAction.java
##########
@@ -79,6 +87,20 @@
}
}, DataTypes.StringType);
+ private static final UserDefinedFunction decode = functions.udf((String
fullyQualifiedPath) -> {
+ return URLDecoder.decode(fullyQualifiedPath, "UTF-8");
+ }, DataTypes.StringType);
+
+ /**
+ * transform a file path to
Review comment:
Nit: capitilaization
##########
File path:
spark/src/main/java/org/apache/iceberg/actions/RemoveOrphanFilesAction.java
##########
@@ -79,6 +87,20 @@
}
}, DataTypes.StringType);
+ private static final UserDefinedFunction decode = functions.udf((String
fullyQualifiedPath) -> {
+ return URLDecoder.decode(fullyQualifiedPath, "UTF-8");
+ }, DataTypes.StringType);
+
+ /**
+ * transform a file path to
+ * {@code Dataset<Row<file_path_no_scheme_authority,
file_path_with_scheme_authority>>}
+ */
+ private static final UserDefinedFunction fileWithRelativePath =
functions.udf((String fullyQualifiedPath) -> {
Review comment:
I don't want to bike shed to much here, but I think this name ends up
being confusing because later we use it in conjunction with very similarly
named variables which represent Dataframes and also use Apply methods. I would
recommend we rename this to something with "UDF" in it so it's clear.
##########
File path:
spark/src/main/java/org/apache/iceberg/actions/RemoveOrphanFilesAction.java
##########
@@ -254,4 +270,53 @@ private static void listDirRecursively(
return files.iterator();
};
}
+
+ protected static List<String> findOrphanFiles(
+ Dataset<Row> validFileDF,
+ Dataset<Row> actualFileDF) {
+ Column nameEqual = filename.apply(actualFileDF.col(RELATIVE_FILE_PATH))
+ .equalTo(filename.apply(validFileDF.col(RELATIVE_FILE_PATH)));
+
+ Column pathContains = actualFileDF.col(RELATIVE_FILE_PATH)
+ .contains(validFileDF.col(RELATIVE_FILE_PATH));
+
+ Column joinCond = nameEqual.and(pathContains);
+ Column decodeFilepath = decode.apply(actualFileDF.col(FILE_PATH));
+ return actualFileDF.join(validFileDF, joinCond,
"leftanti").select(decodeFilepath)
+ .as(Encoders.STRING())
+ .collectAsList();
+ }
+
+ /**
+ * From
+ * <pre>{@code
+ * Dataset<Row<file_path_with_scheme_authority>>
+ * will be transformed to
+ * Dataset<Row<file_path_no_scheme_authority,
file_path_with_scheme_authority>>
+ * }</pre>
+ *
+ * This is required to compare the valid and all files to find the orphan
files.
+ * Based on the result data set, only path will be compared while comparing
valid and all files path.
+ * As in the case of hadoop, s3, there could be different authority names to
access same path, which can give us files
+ * which are part of metadata and not orphan.
+ *
+ * @param filePathWithSchemeAndAuthority : complete file path, can include
scheme, authority and path.
+ * @return : {@code file_path_no_scheme_authority, file_path}
+ */
+ protected static Dataset<Row> addRelativePathColumn(Dataset<Row>
filePathWithSchemeAndAuthority) {
+ return filePathWithSchemeAndAuthority.withColumn(URI_DETAIL,
+ fileWithRelativePath.apply(
Review comment:
This is where the UDF naming looks confusing to me since we call apply
on 2 objects which are very different and they have similar names, one is data
one is code.
##########
File path:
spark/src/test/java/org/apache/iceberg/actions/TestRemoveOrphanFilesAction.java
##########
@@ -528,6 +566,71 @@ public void testRemoveOrphanFilesWithRelativeFilePath()
throws IOException, Inte
Assert.assertTrue("Invalid file should be present", fs.exists(new
Path(invalidFiles.get(0))));
}
+ @Test
+ public void testRemoveOrphanFilesWithSpecialCharsFilePath() throws
IOException, InterruptedException {
+ File whiteSpaceDir = null;
+ try {
+ whiteSpaceDir = new File(tableDir.getAbsolutePath() + "/white space");
+ whiteSpaceDir.mkdirs();
+
+ Table table = TABLES.create(
+ SCHEMA,
+ PartitionSpec.unpartitioned(),
+ Maps.newHashMap(),
+ tableDir.getAbsolutePath() + "/white space");
+
+ List<ThreeColumnRecord> records = Lists.newArrayList(
+ new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")
+ );
+
+ Dataset<Row> df = spark.createDataFrame(records,
ThreeColumnRecord.class).coalesce(1);
+
+ df.select("c1", "c2", "c3")
+ .write()
+ .format("iceberg")
+ .mode("append")
+ .save(whiteSpaceDir.getAbsolutePath());
+
+ List<String> validFiles = spark.read().format("iceberg")
+ .load(whiteSpaceDir + "#files")
+ .select("file_path")
+ .as(Encoders.STRING())
+ .collectAsList();
+ Assert.assertEquals("Should be 1 valid files", 1, validFiles.size());
+ String validFile = validFiles.get(0);
+
+ df.write().mode("append").parquet(whiteSpaceDir + "/data");
+
+ Path dataPath = new Path(whiteSpaceDir + "/data");
+ FileSystem fs =
dataPath.getFileSystem(spark.sessionState().newHadoopConf());
+ List<String> allFiles = Arrays.stream(fs.listStatus(dataPath,
HiddenPathFilter.get()))
+ .filter(FileStatus::isFile)
+ .map(file -> file.getPath().toString())
+ .collect(Collectors.toList());
+ Assert.assertEquals("Should be 2 files", 2, allFiles.size());
+
+ List<String> invalidFiles = Lists.newArrayList(allFiles);
+ invalidFiles.removeIf(file -> file.contains(validFile));
+ Assert.assertEquals("Should be 1 invalid file", 1, invalidFiles.size());
+
+ // sleep for 1 second to unsure files will be old enough
+ Thread.sleep(1000);
+
+ Actions actions = Actions.forTable(table);
+ List<String> result = actions.removeOrphanFiles()
+ .olderThan(System.currentTimeMillis())
+ .execute();
+ Assert.assertEquals("Action should find 1 file", invalidFiles, result);
Review comment:
Couldn't we check whether result matches invalid files?
##########
File path:
spark/src/main/java/org/apache/iceberg/actions/RemoveOrphanFilesAction.java
##########
@@ -254,4 +270,53 @@ private static void listDirRecursively(
return files.iterator();
};
}
+
+ protected static List<String> findOrphanFiles(
+ Dataset<Row> validFileDF,
+ Dataset<Row> actualFileDF) {
+ Column nameEqual = filename.apply(actualFileDF.col(RELATIVE_FILE_PATH))
+ .equalTo(filename.apply(validFileDF.col(RELATIVE_FILE_PATH)));
+
+ Column pathContains = actualFileDF.col(RELATIVE_FILE_PATH)
+ .contains(validFileDF.col(RELATIVE_FILE_PATH));
+
+ Column joinCond = nameEqual.and(pathContains);
Review comment:
Isn't PathContains always true if nameEqual? I feel like you just need
nameEqual Here
##########
File path:
spark/src/test/java/org/apache/iceberg/actions/TestRemoveOrphanFilesAction.java
##########
@@ -528,6 +566,71 @@ public void testRemoveOrphanFilesWithRelativeFilePath()
throws IOException, Inte
Assert.assertTrue("Invalid file should be present", fs.exists(new
Path(invalidFiles.get(0))));
}
+ @Test
+ public void testRemoveOrphanFilesWithSpecialCharsFilePath() throws
IOException, InterruptedException {
+ File whiteSpaceDir = null;
+ try {
+ whiteSpaceDir = new File(tableDir.getAbsolutePath() + "/white space");
Review comment:
Not sure we need to be careful with cleanup here, couldn't we just use
temp.newDir and then skip the cleanup?
##########
File path:
spark/src/test/java/org/apache/iceberg/actions/TestRemoveOrphanFilesAction.java
##########
@@ -569,4 +672,349 @@ public void testRemoveOrphanFilesWithHadoopCatalog()
throws InterruptedException
.collectAsList();
Assert.assertEquals("Rows must match", records, actualRecords);
}
+
+ @Test
+ public void testFindOrphanFilesWithSameAuthority() throws Exception {
+ List<Row> validFilesData =
getRowsWithFilePath(HDFS_SERVICENAME_USER_LOG_DATA_DUMMY_FILE, 3);
+
+ Dataset<Row> validFileDS = RemoveOrphanFilesAction
+ .addRelativePathColumn(
+ spark.createDataset(validFilesData,
RowEncoder.apply(constructStructureWithString())));
+
+ List<Row> actualFilesData = new ArrayList<>();
+
+ actualFilesData.addAll(validFilesData);
+ List<Row> expectedOrphanFiles =
getRowsWithFilePath(HDFS_SERVICENAME_USER_LOG_DATA_ORPHAN_FILE, 4);
+ actualFilesData.addAll(expectedOrphanFiles);
+
+ Dataset<Row> actualFileDS = RemoveOrphanFilesAction
+ .addRelativePathColumn(
+ spark.createDataset(actualFilesData,
RowEncoder.apply(constructStructureWithString())));
+
+ List<String> orphanFiles =
RemoveOrphanFilesAction.findOrphanFiles(validFileDS, actualFileDS);
+
+ Assert.assertNotNull(orphanFiles);
+
+ Assert.assertEquals(expectedOrphanFiles.stream().map(row ->
row.get(0).toString())
+ .collect(Collectors.toList()), orphanFiles);
+ }
+
+ @Test
+ public void testFindOrphanFilesWithValidFileHasNoAuthority() throws
Exception {
+ List<Row> validFilesData =
getRowsWithFilePath(HDFS_USER_LOG_DATA_DUMMY_FILE, 4);
+ Dataset<Row> validFileDS = RemoveOrphanFilesAction
+ .addRelativePathColumn(
+ spark.createDataset(validFilesData,
RowEncoder.apply(constructStructureWithString())));
+
+ List<Row> actualFilesData = new ArrayList<>();
+
+
actualFilesData.addAll(getRowsWithFilePath(HDFS_SERVICENAME_USER_LOG_DATA_DUMMY_FILE,
4));
+ List<Row> expectedOrphanFiles =
getRowsWithFilePath(HDFS_SERVICENAME_USER_LOG_DATA_ORPHAN_FILE, 4);
+ actualFilesData.addAll(expectedOrphanFiles);
+
+ Dataset<Row> actualFileDS = RemoveOrphanFilesAction
+ .addRelativePathColumn(
+ spark.createDataset(actualFilesData,
RowEncoder.apply(constructStructureWithString())));
+
+ List<String> orphanFiles =
RemoveOrphanFilesAction.findOrphanFiles(validFileDS, actualFileDS);
+
+ Assert.assertEquals(expectedOrphanFiles.stream().map(row ->
row.get(0).toString())
+ .collect(Collectors.toList()), orphanFiles);
+ }
+
+ @Test
Review comment:
My gut tells me we should be able to parameterize all of tests around
the FileName so we can remove a bit of duplicate code here.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]