aokolnychyi commented on code in PR #4652:
URL: https://github.com/apache/iceberg/pull/4652#discussion_r921340638
##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java:
##########
@@ -232,6 +259,75 @@ private DeleteOrphanFiles.Result doExecute() {
return new BaseDeleteOrphanFilesActionResult(orphanFiles);
}
+ private static Map<String, String>
populateEqualSchemeAndAuthoritiesMap(List<String> equalSchemes,
+
List<String> equalAuthorities) {
+ Map<String, String> equalSchemeAndAuthoritiesMap = Maps.newHashMap();
+ if (equalSchemes != null && !equalSchemes.isEmpty()) {
+ String firstScheme = equalSchemes.get(0);
+ equalSchemes.forEach(value -> equalSchemeAndAuthoritiesMap.put(value,
firstScheme));
+ }
+
+ if (equalAuthorities != null && !equalAuthorities.isEmpty()) {
+ String firstAuthority = equalAuthorities.get(0);
+ equalAuthorities.forEach(value ->
equalSchemeAndAuthoritiesMap.put(value, firstAuthority));
+ }
+ return equalSchemeAndAuthoritiesMap;
+ }
+
+ @VisibleForTesting
+ static List<String> findOrphanFiles(Dataset<Row> actualFileDF,
+ Dataset<Row> validFileDF,
+ Map<String, String> equalSchemes,
+ Map<String, String> equalAuthorities,
+ SetAccumulator<Pair<String, String>>
setAccumulator,
+ PrefixMismatchMode prefixMismatchMode) {
+ Map<String, String> equalSchemesAndAuthoritiesMap = Maps.newHashMap();
+ equalSchemesAndAuthoritiesMap.putAll(flattenMap(equalSchemes));
+ equalSchemesAndAuthoritiesMap.putAll(flattenMap(equalAuthorities));
+
+ Dataset<Row> normalizedActualFileDF =
actualFileDF.mapPartitions(toFileMetadata(equalSchemesAndAuthoritiesMap),
+ RowEncoder.apply(FILE_METADATA_SCHEMA)).as("actual");
+ Dataset<Row> normalizedValidFileDF =
validFileDF.mapPartitions(toFileMetadata(equalSchemesAndAuthoritiesMap),
+ RowEncoder.apply(FILE_METADATA_SCHEMA)).as("valid");
+
+ Column actualFileName = normalizedActualFileDF.col("path");
+ Column validFileName = normalizedValidFileDF.col("path");
+
+ Dataset<FileDescriptor> mayBeOrphanFilesDF =
normalizedActualFileDF.join(normalizedValidFileDF,
+ actualFileName.equalTo(validFileName),
+ "leftouter")
+ .selectExpr("actual.scheme as actualScheme",
+ "actual.authority as actualAuthority",
+ "actual.path as actualPath",
+ "actual.file_path as actualFilePath",
+ "valid.scheme as validScheme",
+ "valid.authority as validAuthority",
+ "valid.path as validPath",
+ "valid.file_path as validFilePath")
+ .as(Encoders.bean(FileDescriptor.class));
+
+ List<String> orphanFiles = mayBeOrphanFilesDF.mapPartitions(
+ new MapOrphanFilesFunction(setAccumulator),
Encoders.STRING()).collectAsList();
+ if (prefixMismatchMode == PrefixMismatchMode.ERROR &&
!setAccumulator.value().isEmpty()) {
+ throw new ValidationException("Unable to deterministically find all
orphan files." +
+ " Found file paths that have same file path but different
authorities/schemes. Conflicting" +
+ " authorities/schemes found: %s", setAccumulator.value().toString());
+ }
+ return orphanFiles;
+ }
+
+ private static Map<String, String> flattenMap(Map<String, String>
toBeFlattenedMap) {
Review Comment:
To be honest, I am not sure supporting comma separated values will be easier
for the user. I'd either skip this logic and just support a plain map or at
least support commas in the key vs in the value (`s3a, s3n` -> `s3`).
##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java:
##########
@@ -232,6 +259,75 @@ private DeleteOrphanFiles.Result doExecute() {
return new BaseDeleteOrphanFilesActionResult(orphanFiles);
}
+ private static Map<String, String>
populateEqualSchemeAndAuthoritiesMap(List<String> equalSchemes,
+
List<String> equalAuthorities) {
+ Map<String, String> equalSchemeAndAuthoritiesMap = Maps.newHashMap();
+ if (equalSchemes != null && !equalSchemes.isEmpty()) {
+ String firstScheme = equalSchemes.get(0);
+ equalSchemes.forEach(value -> equalSchemeAndAuthoritiesMap.put(value,
firstScheme));
+ }
+
+ if (equalAuthorities != null && !equalAuthorities.isEmpty()) {
+ String firstAuthority = equalAuthorities.get(0);
+ equalAuthorities.forEach(value ->
equalSchemeAndAuthoritiesMap.put(value, firstAuthority));
+ }
+ return equalSchemeAndAuthoritiesMap;
+ }
+
+ @VisibleForTesting
+ static List<String> findOrphanFiles(Dataset<Row> actualFileDF,
+ Dataset<Row> validFileDF,
+ Map<String, String> equalSchemes,
+ Map<String, String> equalAuthorities,
+ SetAccumulator<Pair<String, String>>
setAccumulator,
+ PrefixMismatchMode prefixMismatchMode) {
+ Map<String, String> equalSchemesAndAuthoritiesMap = Maps.newHashMap();
+ equalSchemesAndAuthoritiesMap.putAll(flattenMap(equalSchemes));
+ equalSchemesAndAuthoritiesMap.putAll(flattenMap(equalAuthorities));
+
+ Dataset<Row> normalizedActualFileDF =
actualFileDF.mapPartitions(toFileMetadata(equalSchemesAndAuthoritiesMap),
+ RowEncoder.apply(FILE_METADATA_SCHEMA)).as("actual");
+ Dataset<Row> normalizedValidFileDF =
validFileDF.mapPartitions(toFileMetadata(equalSchemesAndAuthoritiesMap),
+ RowEncoder.apply(FILE_METADATA_SCHEMA)).as("valid");
+
+ Column actualFileName = normalizedActualFileDF.col("path");
+ Column validFileName = normalizedValidFileDF.col("path");
+
+ Dataset<FileDescriptor> mayBeOrphanFilesDF =
normalizedActualFileDF.join(normalizedValidFileDF,
Review Comment:
I think we should be able to use typed `joinWith`.
```
Dataset<FileMetadata> actualFileMetadataDS = actualFileDF
.mapPartitions(toFileMetadata(equalSchemes, equalAuthorities),
Encoders.bean(FileMetadata.class));
Dataset<FileMetadata> validFileMetadataDS = validFileDF
.mapPartitions(toFileMetadata(equalSchemes, equalAuthorities),
Encoders.bean(FileMetadata.class));
Column joinCod =
actualFileMetadataDS.col("path").equalTo(validFileMetadataDS.col("path"));
SetAccumulator<Pair<String, String>> conflicts = new SetAccumulator<>();
spark.sparkContext().register(conflicts);
List<String> orphanFiles = actualFileMetadataDS
.joinWith(validFileMetadataDS, joinCod, "leftouter")
.mapPartitions(findOrphanFiles(prefixMismatchMode, conflicts),
Encoders.STRING())
.collectAsList();
```
```
private static MapPartitionsFunction<Row, FileMetadata> toFileMetadata(...)
```
##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java:
##########
@@ -232,6 +259,75 @@ private DeleteOrphanFiles.Result doExecute() {
return new BaseDeleteOrphanFilesActionResult(orphanFiles);
}
+ private static Map<String, String>
populateEqualSchemeAndAuthoritiesMap(List<String> equalSchemes,
+
List<String> equalAuthorities) {
+ Map<String, String> equalSchemeAndAuthoritiesMap = Maps.newHashMap();
+ if (equalSchemes != null && !equalSchemes.isEmpty()) {
+ String firstScheme = equalSchemes.get(0);
+ equalSchemes.forEach(value -> equalSchemeAndAuthoritiesMap.put(value,
firstScheme));
+ }
+
+ if (equalAuthorities != null && !equalAuthorities.isEmpty()) {
+ String firstAuthority = equalAuthorities.get(0);
+ equalAuthorities.forEach(value ->
equalSchemeAndAuthoritiesMap.put(value, firstAuthority));
+ }
+ return equalSchemeAndAuthoritiesMap;
+ }
+
+ @VisibleForTesting
+ static List<String> findOrphanFiles(Dataset<Row> actualFileDF,
+ Dataset<Row> validFileDF,
+ Map<String, String> equalSchemes,
+ Map<String, String> equalAuthorities,
+ SetAccumulator<Pair<String, String>>
setAccumulator,
+ PrefixMismatchMode prefixMismatchMode) {
+ Map<String, String> equalSchemesAndAuthoritiesMap = Maps.newHashMap();
Review Comment:
nit: I think it will be cleaner to keep these maps separate.
##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java:
##########
@@ -94,14 +101,12 @@
extends BaseSparkAction<DeleteOrphanFiles, DeleteOrphanFiles.Result>
implements DeleteOrphanFiles {
private static final Logger LOG =
LoggerFactory.getLogger(BaseDeleteOrphanFilesSparkAction.class);
- private static final UserDefinedFunction filenameUDF = functions.udf((String
path) -> {
- int lastIndex = path.lastIndexOf(File.separator);
- if (lastIndex == -1) {
- return path;
- } else {
- return path.substring(lastIndex + 1);
- }
- }, DataTypes.StringType);
+ private static final StructType FILE_METADATA_SCHEMA = new StructType(new
StructField[]{
Review Comment:
This won't be needed if we switch to `joinWith`.
##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/FileDescriptor.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+public class FileDescriptor {
Review Comment:
If we switch to `joinWith` in the action, this class can be simplified.
```
public static class FileMetadata {
private String scheme;
private String authority;
private String path;
private String location;
...
}
```
##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java:
##########
@@ -213,14 +239,15 @@ private DeleteOrphanFiles.Result doExecute() {
Dataset<Row> validFileDF = validContentFileDF.union(validMetadataFileDF);
Dataset<Row> actualFileDF = compareToFileList == null ?
buildActualFileDF() : filteredCompareToFileList();
- Column actualFileName = filenameUDF.apply(actualFileDF.col(FILE_PATH));
- Column validFileName = filenameUDF.apply(validFileDF.col(FILE_PATH));
- Column nameEqual = actualFileName.equalTo(validFileName);
- Column actualContains =
actualFileDF.col(FILE_PATH).contains(validFileDF.col(FILE_PATH));
- Column joinCond = nameEqual.and(actualContains);
- List<String> orphanFiles = actualFileDF.join(validFileDF, joinCond,
"leftanti")
- .as(Encoders.STRING())
- .collectAsList();
+ SetAccumulator<Pair<String, String>> setAccumulator = new
SetAccumulator<>();
Review Comment:
What about passing `spark()` to `findOrphanFiles` and creating the
accumulator there? That's the method that actually uses it.
##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/FileDescriptor.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+public class FileDescriptor {
Review Comment:
I'd also make it static nested class in the action as it is only used in the
action.
##########
api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java:
##########
@@ -89,4 +125,13 @@ interface Result {
*/
Iterable<String> orphanFileLocations();
}
+
+ /**
+ * Defines the Delete Orphan files behaviour when there is mismatch in
prefix(scheme/authority)
+ * ERROR - Throws an exception when prefix mismatch
+ * IGNORE - No action when prefix mismatch
+ */
+ enum PrefixMismatchMode {
+ ERROR, IGNORE
Review Comment:
I thought a bit more about this and maybe we do need `DELETE` mode, even
though I was reluctant to support it initially. The behavior will be you get an
exception with all conflicting schemes/authorities by default and are asked to
resolve conflicts. If that's not enough, then you can use either IGNORE or
DELETE to handle the remaining conflicts.
##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java:
##########
@@ -361,4 +457,51 @@ static PathFilter forSpecs(Map<Integer, PartitionSpec>
specs) {
return partitionNames.isEmpty() ? HiddenPathFilter.get() : new
PartitionAwareHiddenPathFilter(partitionNames);
}
}
+
+ private static MapPartitionsFunction<Row, Row> toFileMetadata(Map<String,
String> equalSchemeAndAuthoritiesMap) {
+ return rows -> Iterators.transform(rows, row -> {
+ String filePathAsString = row.getString(0);
+ URI uri = new Path(filePathAsString).toUri();
+ String scheme =
equalSchemeAndAuthoritiesMap.getOrDefault(uri.getScheme(), uri.getScheme());
+ String authority =
equalSchemeAndAuthoritiesMap.getOrDefault(uri.getAuthority(),
uri.getAuthority());
+ return RowFactory.create(scheme, authority, uri.getPath(),
filePathAsString);
Review Comment:
This will become:
```
return new FileMetadata(scheme, authority, uri.getPath(), location);
```
##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java:
##########
@@ -361,4 +457,51 @@ static PathFilter forSpecs(Map<Integer, PartitionSpec>
specs) {
return partitionNames.isEmpty() ? HiddenPathFilter.get() : new
PartitionAwareHiddenPathFilter(partitionNames);
}
}
+
+ private static MapPartitionsFunction<Row, Row> toFileMetadata(Map<String,
String> equalSchemeAndAuthoritiesMap) {
+ return rows -> Iterators.transform(rows, row -> {
+ String filePathAsString = row.getString(0);
+ URI uri = new Path(filePathAsString).toUri();
+ String scheme =
equalSchemeAndAuthoritiesMap.getOrDefault(uri.getScheme(), uri.getScheme());
+ String authority =
equalSchemeAndAuthoritiesMap.getOrDefault(uri.getAuthority(),
uri.getAuthority());
+ return RowFactory.create(scheme, authority, uri.getPath(),
filePathAsString);
+ });
+ }
+
+ static class MapOrphanFilesFunction implements
MapPartitionsFunction<FileDescriptor, String> {
+
+ private final SetAccumulator<Pair<String, String>> setAcc;
+
+ MapOrphanFilesFunction(SetAccumulator<Pair<String, String>> accumulator) {
+ this.setAcc = accumulator;
+ }
+
+ @Override
+ public Iterator<String> call(Iterator<FileDescriptor> iter) throws
Exception {
+ Iterator<String> orphanFilesIterator = Iterators.transform(iter,
+ row -> isOrphan(row) ? row.getActualFilePath() : null);
+ return Iterators.filter(orphanFilesIterator, str ->
!Strings.isNullOrEmpty(str));
+ }
+
+ boolean isOrphan(FileDescriptor desc) {
+ if (desc.getValidPath() == null) {
+ return true;
+ }
+ if (!isEqual(desc.getActualScheme(), desc.getValidScheme())) {
+ setAcc.add(Pair.of(desc.getActualScheme(), desc.getValidScheme()));
+ }
+ if (!isEqual(desc.getActualAuthority(), desc.getValidAuthority())) {
+ setAcc.add(Pair.of(desc.getActualAuthority(),
desc.getValidAuthority()));
+ }
+ return false;
+ }
+
+ private boolean isEqual(String actual, String valid) {
Review Comment:
It is a little bit hard to wrap the head around this logic. Would something
like this be easier to grasp?
```
private static MapPartitionsFunction<Tuple2<FileMetadata, FileMetadata>,
String> findOrphanFiles(
PrefixMismatchMode mode,
SetAccumulator<Pair<String, String>> conflicts) {
return rows -> {
Iterator<String> transformed = Iterators.transform(rows, row -> {
FileMetadata actual = row._1;
FileMetadata valid = row._2;
if (valid.path == null) {
return actual.location;
}
boolean schemeMatch = Strings.isNullOrEmpty(valid.scheme) ||
valid.scheme.equalsIgnoreCase(actual.scheme);
boolean authorityMatch = Strings.isNullOrEmpty(valid.authority) ||
valid.authority.equalsIgnoreCase(actual.authority);
switch (mode) {
...
}
});
return Iterators.filter(transformed, Objects::nonNull);
};
}
```
##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java:
##########
@@ -213,14 +239,15 @@ private DeleteOrphanFiles.Result doExecute() {
Dataset<Row> validFileDF = validContentFileDF.union(validMetadataFileDF);
Dataset<Row> actualFileDF = compareToFileList == null ?
buildActualFileDF() : filteredCompareToFileList();
- Column actualFileName = filenameUDF.apply(actualFileDF.col(FILE_PATH));
- Column validFileName = filenameUDF.apply(validFileDF.col(FILE_PATH));
- Column nameEqual = actualFileName.equalTo(validFileName);
- Column actualContains =
actualFileDF.col(FILE_PATH).contains(validFileDF.col(FILE_PATH));
- Column joinCond = nameEqual.and(actualContains);
- List<String> orphanFiles = actualFileDF.join(validFileDF, joinCond,
"leftanti")
- .as(Encoders.STRING())
- .collectAsList();
+ SetAccumulator<Pair<String, String>> setAccumulator = new
SetAccumulator<>();
+ spark().sparkContext().register(setAccumulator);
+ List<String> orphanFiles =
Review Comment:
What about one of the following formatting variants?
```
List<String> orphanFiles = findOrphanFiles(
spark(), actualFileDF, validFileDF, equalSchemes, equalAuthorities,
prefixMismatchMode);
```
```
List<String> orphanFiles = findOrphanFiles(
spark(), actualFileDF, validFileDF,
equalSchemes, equalAuthorities, prefixMismatchMode);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]