rdblue commented on code in PR #4652:
URL: https://github.com/apache/iceberg/pull/4652#discussion_r889411806
##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java:
##########
@@ -361,4 +403,90 @@ static PathFilter forSpecs(Map<Integer, PartitionSpec>
specs) {
return partitionNames.isEmpty() ? HiddenPathFilter.get() : new
PartitionAwareHiddenPathFilter(partitionNames);
}
}
+
+ private static Dataset<Row> sanitizeAndSplitPaths(Dataset<Row> filesDF) {
+ return filesDF.mapPartitions((MapPartitionsFunction<Row, Row>) input ->
+ Iterators.transform(input, row -> {
+ String pathString = row.getString(0);
+ Path path = new Path(pathString);
+ URI uri = path.toUri();
+ List<Object> values = Lists.newArrayList(uri.getScheme(),
uri.getAuthority(), uri.getPath(), pathString);
+ return
Row$.MODULE$.apply(scala.collection.JavaConverters.asScalaBuffer(values).toSeq());
+ }), RowEncoder.apply(fileSchema));
+ }
+ static class MapOrphanFilesFunction implements MapPartitionsFunction<Row,
String> {
+
+ private final List<String> equivalentSchemes;
+ private final List<String> equivalentAuthorities;
+ private final PrefixMisMatchMode mismatchMode;
+ private final StructType scheme;
+
+ MapOrphanFilesFunction(List<String> equivalentSchemes,
+ List<String> equivalentAuthorities,
+ PrefixMisMatchMode mismatchMode,
+ StructType schema) {
+ this.equivalentSchemes = equivalentSchemes;
+ this.equivalentAuthorities = equivalentAuthorities;
+ this.mismatchMode = mismatchMode;
+ this.scheme = schema;
+ }
+
+ @Override
+ public Iterator<String> call(Iterator<Row> value) throws Exception {
+
+ Iterator<String> orphanFilesIterator = Iterators.transform(value, row ->
{
+ if (isOrphan(row)) {
+ return row.getString(3);
+ } else {
+ return null;
+ }
+ });
+ return Iterators.filter(orphanFilesIterator, StringUtils::isNotBlank);
+ }
+
+ boolean isOrphan(Row row) {
+ String[] fields = scheme.fieldNames();
+
+ // actual file related fields
+ assert (fields[0].equalsIgnoreCase("scheme"));
+ String actualScheme = row.getString(0);
+ assert (fields[1].equalsIgnoreCase("authority"));
+ String actualAuthority = row.getString(1);
+ assert (fields[3].equalsIgnoreCase("file_path"));
+ String actualFullPath = row.getString(3);
+
+ // valid files related fields
+ assert (fields[4].equalsIgnoreCase("scheme"));
+ String validScheme = row.getString(4);
+ assert (fields[5].equalsIgnoreCase("authority"));
+ String validAuthority = row.getString(5);
+ assert (fields[6].equalsIgnoreCase("path"));
+ String validPath = row.getString(6);
+ assert (fields[7].equalsIgnoreCase("file_path"));
+ String validFullPath = row.getString(7);
+ if (validPath == null) {
+ return true;
+ }
+ boolean isEquivalent = isEquivalent(equivalentSchemes, actualScheme,
validScheme) &&
+ isEquivalent(equivalentAuthorities, actualAuthority, validAuthority);
+ if (!isEquivalent && mismatchMode == PrefixMisMatchMode.ERROR) {
Review Comment:
@aokolnychyi, should this throw an exception on the task side? Failing the
job this way seems like it will produce a bunch of task errors and retries that
are hard for people to understand. If we want to change this, we'd need to
produce a DataFrame with an `is_error` column or something, so it isn't trivial
to do better. We may want to leave it as-is.
Also, I just want to note one thing about the join type here. If we have two
matches for a file, using the left outer join will produce two joined rows and
one could fail the equivalence check while the other succeeds. For example, if
you had `s3://b1/path.parquet` and `s3://b2/path.parquet` and the actual data
file was `s3://b1/path.parquet` then the second match (`b1 != b2`) would throw
an exception even though there isn't a problem. This can only happen if the
path component is identical for two valid paths, so I think we can probably
ignore it. But I wanted to at least point it out and see what you think.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]