aokolnychyi commented on code in PR #4652:
URL: https://github.com/apache/iceberg/pull/4652#discussion_r927017901
##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java:
##########
@@ -94,18 +101,15 @@
extends BaseSparkAction<DeleteOrphanFilesSparkAction> implements
DeleteOrphanFiles {
private static final Logger LOG =
LoggerFactory.getLogger(DeleteOrphanFilesSparkAction.class);
- private static final UserDefinedFunction filenameUDF = functions.udf((String
path) -> {
- int lastIndex = path.lastIndexOf(File.separator);
- if (lastIndex == -1) {
- return path;
- } else {
- return path.substring(lastIndex + 1);
- }
- }, DataTypes.StringType);
-
private final SerializableConfiguration hadoopConf;
private final int partitionDiscoveryParallelism;
private final Table table;
+ private static final Splitter COMMA = Splitter.on(",");
+ private static final Map<String, String> EQUAL_SCHEMES_DEFAULT =
ImmutableMap.of("s3n,s3a", "s3");
+
+ private Map<String, String> equalSchemes = ImmutableMap.of();
Review Comment:
Shouldn't this be defaulted to `flattenMap(EQUAL_SCHEMES_DEFAULT)`?
##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java:
##########
@@ -94,18 +101,15 @@
extends BaseSparkAction<DeleteOrphanFilesSparkAction> implements
DeleteOrphanFiles {
private static final Logger LOG =
LoggerFactory.getLogger(DeleteOrphanFilesSparkAction.class);
- private static final UserDefinedFunction filenameUDF = functions.udf((String
path) -> {
- int lastIndex = path.lastIndexOf(File.separator);
- if (lastIndex == -1) {
- return path;
- } else {
- return path.substring(lastIndex + 1);
- }
- }, DataTypes.StringType);
-
private final SerializableConfiguration hadoopConf;
private final int partitionDiscoveryParallelism;
private final Table table;
+ private static final Splitter COMMA = Splitter.on(",");
+ private static final Map<String, String> EQUAL_SCHEMES_DEFAULT =
ImmutableMap.of("s3n,s3a", "s3");
+
+ private Map<String, String> equalSchemes = ImmutableMap.of();
+ private Map<String, String> equalAuthorities = ImmutableMap.of();
Review Comment:
Let's not use immutable collections to avoid Kryo issues. It is OK to pass
an immutable map to `flattenMap` as it will create a mutable version under the
hood.
This should be:
```
private Map<String, String> equalSchemes = flattenMap(EQUAL_SCHEMES_DEFAULT);
private Map<String, String> equalAuthorities = Collections.emptyMap();
```
##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java:
##########
@@ -143,6 +148,27 @@ public DeleteOrphanFilesSparkAction
executeDeleteWith(ExecutorService executorSe
return this;
}
+ @Override
+ public DeleteOrphanFiles newPrefixMismatchMode(PrefixMismatchMode
mismatchMode) {
Review Comment:
Earlier I meant using `new***` pattern for args, not methods.
```
public DeleteOrphanFilesSparkAction prefixMismatchMode(PrefixMismatchMode
newPrefixMismatchMode) {
...
}
```
##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java:
##########
@@ -94,18 +101,15 @@
extends BaseSparkAction<DeleteOrphanFilesSparkAction> implements
DeleteOrphanFiles {
private static final Logger LOG =
LoggerFactory.getLogger(DeleteOrphanFilesSparkAction.class);
- private static final UserDefinedFunction filenameUDF = functions.udf((String
path) -> {
- int lastIndex = path.lastIndexOf(File.separator);
- if (lastIndex == -1) {
- return path;
- } else {
- return path.substring(lastIndex + 1);
- }
- }, DataTypes.StringType);
-
private final SerializableConfiguration hadoopConf;
private final int partitionDiscoveryParallelism;
private final Table table;
+ private static final Splitter COMMA = Splitter.on(",");
Review Comment:
Static vars should be grouped together just like mutable and immutable
instance variables.
The correct ordering would be like this:
```
private static final Logger LOG = ...;
private static final Splitter COMMA = ...;
private static final Map<String, String> EQUAL_SCHEMES_DEFAULT = ...;
private final SerializableConfiguration hadoopConf;
private final int partitionDiscoveryParallelism;
private final Table table;
private final Consumer<String> defaultDelete = ...;
private String location = ...;
private long olderThanTimestamp = ...;
...
private PrefixMismatchMode prefixMismatchMode = ...;
private Map<String, String> equalSchemes = ...;
private Map<String, String> equalAuthorities = ...;
```
That way, we first have class vars, then immutable instance vars and then
mutable instance vars. Each block separated by an empty line.
##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java:
##########
@@ -326,6 +346,96 @@ private static FlatMapFunction<Iterator<String>, String>
listDirsRecursively(
};
}
+ @VisibleForTesting
+ static List<String> findOrphanFiles(
+ SparkSession spark, Dataset<Row> actualFileDF, Dataset<Row>
validFileDF,
+ Map<String, String> equalSchemes, Map<String, String>
equalAuthorities,
+ PrefixMismatchMode prefixMismatchMode) {
+ Dataset<FileMetadata> actualFileMetadataDS = actualFileDF.mapPartitions(
+ toFileMetadata(equalSchemes, equalAuthorities),
+ Encoders.bean(FileMetadata.class));
+ Dataset<FileMetadata> validFileMetadataDS = validFileDF.mapPartitions(
+ toFileMetadata(equalSchemes, equalAuthorities),
+ Encoders.bean(FileMetadata.class));
+
+ SetAccumulator<Pair<String, String>> conflicts = new SetAccumulator<>();
+ spark.sparkContext().register(conflicts);
+
+ Column joinCond =
actualFileMetadataDS.col("path").equalTo(validFileMetadataDS.col("path"));
+
+ List<String> orphanFiles =
actualFileMetadataDS.joinWith(validFileMetadataDS,
+ joinCond, "leftouter")
+ .mapPartitions(findOrphanFiles(prefixMismatchMode, conflicts),
Encoders.STRING())
+ .collectAsList();
+
+ if (prefixMismatchMode == PrefixMismatchMode.ERROR &&
!conflicts.value().isEmpty()) {
+ throw new ValidationException("Unable to determine whether certain files
are orphan. " +
+ "Metadata references files that match listed/provided files
except for authority/scheme. " +
+ "Please, inspect the conflicting authorities/schemes and provide
which of them are equal " +
+ "by further configuring the action via equalSchemes() and
equalAuthorities() methods. " +
+ "Set the prefix mismatch mode to 'NONE' to ignore remaining
locations with conflicting " +
+ "authorities/schemes or to 'DELETE' iff you are ABSOLUTELY
confident that remaining conflicting " +
+ "authorities/schemes are different. It will be impossible to
recover deleted files. " +
+ "Conflicting authorities/schemes: %s.", conflicts.value());
+ }
+ return orphanFiles;
+ }
+
+ private static Map<String, String> flattenMap(Map<String, String> map) {
+ Map<String, String> flattenedMap = Maps.newHashMap();
+ if (map != null) {
+ for (String key : map.keySet()) {
+ String value = map.get(key);
+ for (String splitKey : COMMA.split(key)) {
+ flattenedMap.put(splitKey.trim(), value.trim());
+ }
+ }
+ }
+ return flattenedMap;
+ }
+
+ private static MapPartitionsFunction<Tuple2<FileMetadata, FileMetadata>,
String> findOrphanFiles(
+ PrefixMismatchMode mode,
+ SetAccumulator<Pair<String, String>> conflicts) {
+ return rows -> {
+ Iterator<String> transformed = Iterators.transform(rows, row -> {
+ FileMetadata actual = row._1;
+ FileMetadata valid = row._2;
+ if (valid == null) {
+ return actual.filePath;
+ }
+ boolean schemeMatch = Strings.isNullOrEmpty(valid.scheme) ||
+ valid.scheme.equalsIgnoreCase(actual.scheme);
+ boolean authorityMatch = Strings.isNullOrEmpty(valid.authority) ||
+ valid.authority.equalsIgnoreCase(actual.authority);
+ if ((!schemeMatch || !authorityMatch) && mode ==
PrefixMismatchMode.DELETE) {
Review Comment:
nit: Can we add an empty line before this if statement as well?
##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java:
##########
@@ -213,14 +239,8 @@ private DeleteOrphanFiles.Result doExecute() {
Dataset<Row> validFileDF = validContentFileDF.union(validMetadataFileDF);
Dataset<Row> actualFileDF = compareToFileList == null ?
buildActualFileDF() : filteredCompareToFileList();
- Column actualFileName = filenameUDF.apply(actualFileDF.col(FILE_PATH));
- Column validFileName = filenameUDF.apply(validFileDF.col(FILE_PATH));
- Column nameEqual = actualFileName.equalTo(validFileName);
- Column actualContains =
actualFileDF.col(FILE_PATH).contains(validFileDF.col(FILE_PATH));
- Column joinCond = nameEqual.and(actualContains);
- List<String> orphanFiles = actualFileDF.join(validFileDF, joinCond,
"leftanti")
- .as(Encoders.STRING())
- .collectAsList();
+ List<String> orphanFiles = findOrphanFiles(spark(), actualFileDF,
validFileDF,
+ equalSchemes, equalAuthorities, prefixMismatchMode);
Review Comment:
nit: this line uses 8 spaces for continued indentation. Iceberg uses 4
spaces continued indentation (when a single statement is split into multiple
lines).
##########
api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java:
##########
@@ -80,6 +81,53 @@ public interface DeleteOrphanFiles extends
Action<DeleteOrphanFiles, DeleteOrpha
*/
DeleteOrphanFiles executeDeleteWith(ExecutorService executorService);
+ /**
+ * Passes a prefix mismatch mode that determines how this action should
handle situations when
+ * the metadata references files that match listed/provided files except for
authority/scheme.
+ * <p>
+ * Possible values are "ERROR", "IGNORE", "DELETE". The default mismatch
mode is "ERROR",
+ * which means an exception is thrown whenever there is a mismatch in
authority/scheme.
+ * It's the recommended mismatch mode and should be changed only in some
rare circumstances.
+ * If there is a mismatch, use {@link #newEqualSchemes(Map)} (Map)} and
{@link #newEqualAuthorities(Map)} (Map)}
Review Comment:
nit: looks like typos, should be `{@link #equalSchemes(Map)} and {@link
#equalAuthorities(Map)}`
##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java:
##########
@@ -361,4 +471,53 @@ static PathFilter forSpecs(Map<Integer, PartitionSpec>
specs) {
return partitionNames.isEmpty() ? HiddenPathFilter.get() : new
PartitionAwareHiddenPathFilter(partitionNames);
}
}
+
+ public static class FileMetadata implements Serializable {
+ private String scheme;
+ private String authority;
+ private String path;
+ private String filePath;
Review Comment:
I'd probably call it `location` as it is unclear how `filePath` differs from
`path`.
##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java:
##########
@@ -326,6 +346,96 @@ private static FlatMapFunction<Iterator<String>, String>
listDirsRecursively(
};
}
+ @VisibleForTesting
+ static List<String> findOrphanFiles(
+ SparkSession spark, Dataset<Row> actualFileDF, Dataset<Row>
validFileDF,
+ Map<String, String> equalSchemes, Map<String, String>
equalAuthorities,
+ PrefixMismatchMode prefixMismatchMode) {
+ Dataset<FileMetadata> actualFileMetadataDS = actualFileDF.mapPartitions(
+ toFileMetadata(equalSchemes, equalAuthorities),
+ Encoders.bean(FileMetadata.class));
+ Dataset<FileMetadata> validFileMetadataDS = validFileDF.mapPartitions(
+ toFileMetadata(equalSchemes, equalAuthorities),
+ Encoders.bean(FileMetadata.class));
+
+ SetAccumulator<Pair<String, String>> conflicts = new SetAccumulator<>();
+ spark.sparkContext().register(conflicts);
+
+ Column joinCond =
actualFileMetadataDS.col("path").equalTo(validFileMetadataDS.col("path"));
+
+ List<String> orphanFiles =
actualFileMetadataDS.joinWith(validFileMetadataDS,
+ joinCond, "leftouter")
+ .mapPartitions(findOrphanFiles(prefixMismatchMode, conflicts),
Encoders.STRING())
+ .collectAsList();
+
+ if (prefixMismatchMode == PrefixMismatchMode.ERROR &&
!conflicts.value().isEmpty()) {
+ throw new ValidationException("Unable to determine whether certain files
are orphan. " +
+ "Metadata references files that match listed/provided files
except for authority/scheme. " +
Review Comment:
nit: this also uses 8 spaces for indentation instead of 4
##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java:
##########
@@ -326,6 +346,96 @@ private static FlatMapFunction<Iterator<String>, String>
listDirsRecursively(
};
}
+ @VisibleForTesting
+ static List<String> findOrphanFiles(
+ SparkSession spark, Dataset<Row> actualFileDF, Dataset<Row>
validFileDF,
+ Map<String, String> equalSchemes, Map<String, String>
equalAuthorities,
+ PrefixMismatchMode prefixMismatchMode) {
+ Dataset<FileMetadata> actualFileMetadataDS = actualFileDF.mapPartitions(
+ toFileMetadata(equalSchemes, equalAuthorities),
+ Encoders.bean(FileMetadata.class));
+ Dataset<FileMetadata> validFileMetadataDS = validFileDF.mapPartitions(
+ toFileMetadata(equalSchemes, equalAuthorities),
+ Encoders.bean(FileMetadata.class));
+
+ SetAccumulator<Pair<String, String>> conflicts = new SetAccumulator<>();
+ spark.sparkContext().register(conflicts);
+
+ Column joinCond =
actualFileMetadataDS.col("path").equalTo(validFileMetadataDS.col("path"));
+
+ List<String> orphanFiles =
actualFileMetadataDS.joinWith(validFileMetadataDS,
+ joinCond, "leftouter")
+ .mapPartitions(findOrphanFiles(prefixMismatchMode, conflicts),
Encoders.STRING())
+ .collectAsList();
+
+ if (prefixMismatchMode == PrefixMismatchMode.ERROR &&
!conflicts.value().isEmpty()) {
+ throw new ValidationException("Unable to determine whether certain files
are orphan. " +
+ "Metadata references files that match listed/provided files
except for authority/scheme. " +
+ "Please, inspect the conflicting authorities/schemes and provide
which of them are equal " +
+ "by further configuring the action via equalSchemes() and
equalAuthorities() methods. " +
+ "Set the prefix mismatch mode to 'NONE' to ignore remaining
locations with conflicting " +
+ "authorities/schemes or to 'DELETE' iff you are ABSOLUTELY
confident that remaining conflicting " +
+ "authorities/schemes are different. It will be impossible to
recover deleted files. " +
+ "Conflicting authorities/schemes: %s.", conflicts.value());
+ }
+ return orphanFiles;
+ }
+
+ private static Map<String, String> flattenMap(Map<String, String> map) {
+ Map<String, String> flattenedMap = Maps.newHashMap();
+ if (map != null) {
+ for (String key : map.keySet()) {
+ String value = map.get(key);
+ for (String splitKey : COMMA.split(key)) {
+ flattenedMap.put(splitKey.trim(), value.trim());
+ }
+ }
+ }
+ return flattenedMap;
+ }
+
+ private static MapPartitionsFunction<Tuple2<FileMetadata, FileMetadata>,
String> findOrphanFiles(
+ PrefixMismatchMode mode,
+ SetAccumulator<Pair<String, String>> conflicts) {
+ return rows -> {
+ Iterator<String> transformed = Iterators.transform(rows, row -> {
+ FileMetadata actual = row._1;
+ FileMetadata valid = row._2;
+ if (valid == null) {
+ return actual.filePath;
+ }
+ boolean schemeMatch = Strings.isNullOrEmpty(valid.scheme) ||
+ valid.scheme.equalsIgnoreCase(actual.scheme);
+ boolean authorityMatch = Strings.isNullOrEmpty(valid.authority) ||
+ valid.authority.equalsIgnoreCase(actual.authority);
+ if ((!schemeMatch || !authorityMatch) && mode ==
PrefixMismatchMode.DELETE) {
+ return actual.filePath;
+ } else {
+ if (!schemeMatch) {
+ conflicts.add(Pair.of(valid.scheme, actual.scheme));
+ }
+ if (!authorityMatch) {
+ conflicts.add(Pair.of(valid.authority, actual.authority));
+ }
+ }
+ return null;
+ });
+ return Iterators.filter(transformed, Objects::nonNull);
+ };
+ }
+
+ private static MapPartitionsFunction<Row, FileMetadata> toFileMetadata(
+ Map<String, String> equalSchemesMap, Map<String, String>
equalAuthoritiesMap) {
+ return rows -> Iterators.transform(rows, row -> {
+ String filePathAsString = row.getString(0);
Review Comment:
What about calling this `location` and also renaming `filePath` ->
`location` in `FileMetadata`?
##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java:
##########
@@ -326,6 +346,96 @@ private static FlatMapFunction<Iterator<String>, String>
listDirsRecursively(
};
}
+ @VisibleForTesting
+ static List<String> findOrphanFiles(
+ SparkSession spark, Dataset<Row> actualFileDF, Dataset<Row>
validFileDF,
+ Map<String, String> equalSchemes, Map<String, String>
equalAuthorities,
+ PrefixMismatchMode prefixMismatchMode) {
+ Dataset<FileMetadata> actualFileMetadataDS = actualFileDF.mapPartitions(
+ toFileMetadata(equalSchemes, equalAuthorities),
+ Encoders.bean(FileMetadata.class));
+ Dataset<FileMetadata> validFileMetadataDS = validFileDF.mapPartitions(
+ toFileMetadata(equalSchemes, equalAuthorities),
+ Encoders.bean(FileMetadata.class));
+
+ SetAccumulator<Pair<String, String>> conflicts = new SetAccumulator<>();
+ spark.sparkContext().register(conflicts);
+
+ Column joinCond =
actualFileMetadataDS.col("path").equalTo(validFileMetadataDS.col("path"));
+
+ List<String> orphanFiles =
actualFileMetadataDS.joinWith(validFileMetadataDS,
+ joinCond, "leftouter")
+ .mapPartitions(findOrphanFiles(prefixMismatchMode, conflicts),
Encoders.STRING())
+ .collectAsList();
+
+ if (prefixMismatchMode == PrefixMismatchMode.ERROR &&
!conflicts.value().isEmpty()) {
+ throw new ValidationException("Unable to determine whether certain files
are orphan. " +
+ "Metadata references files that match listed/provided files
except for authority/scheme. " +
+ "Please, inspect the conflicting authorities/schemes and provide
which of them are equal " +
+ "by further configuring the action via equalSchemes() and
equalAuthorities() methods. " +
+ "Set the prefix mismatch mode to 'NONE' to ignore remaining
locations with conflicting " +
+ "authorities/schemes or to 'DELETE' iff you are ABSOLUTELY
confident that remaining conflicting " +
+ "authorities/schemes are different. It will be impossible to
recover deleted files. " +
+ "Conflicting authorities/schemes: %s.", conflicts.value());
+ }
+ return orphanFiles;
+ }
+
+ private static Map<String, String> flattenMap(Map<String, String> map) {
+ Map<String, String> flattenedMap = Maps.newHashMap();
+ if (map != null) {
+ for (String key : map.keySet()) {
+ String value = map.get(key);
+ for (String splitKey : COMMA.split(key)) {
+ flattenedMap.put(splitKey.trim(), value.trim());
+ }
+ }
+ }
+ return flattenedMap;
+ }
+
+ private static MapPartitionsFunction<Tuple2<FileMetadata, FileMetadata>,
String> findOrphanFiles(
+ PrefixMismatchMode mode,
+ SetAccumulator<Pair<String, String>> conflicts) {
+ return rows -> {
+ Iterator<String> transformed = Iterators.transform(rows, row -> {
+ FileMetadata actual = row._1;
+ FileMetadata valid = row._2;
+ if (valid == null) {
Review Comment:
nit: Can we add an empty line before and after this if block to separate it
out?
##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java:
##########
@@ -326,6 +346,96 @@ private static FlatMapFunction<Iterator<String>, String>
listDirsRecursively(
};
}
+ @VisibleForTesting
+ static List<String> findOrphanFiles(
+ SparkSession spark, Dataset<Row> actualFileDF, Dataset<Row>
validFileDF,
+ Map<String, String> equalSchemes, Map<String, String>
equalAuthorities,
+ PrefixMismatchMode prefixMismatchMode) {
+ Dataset<FileMetadata> actualFileMetadataDS = actualFileDF.mapPartitions(
+ toFileMetadata(equalSchemes, equalAuthorities),
+ Encoders.bean(FileMetadata.class));
+ Dataset<FileMetadata> validFileMetadataDS = validFileDF.mapPartitions(
+ toFileMetadata(equalSchemes, equalAuthorities),
+ Encoders.bean(FileMetadata.class));
+
+ SetAccumulator<Pair<String, String>> conflicts = new SetAccumulator<>();
+ spark.sparkContext().register(conflicts);
+
+ Column joinCond =
actualFileMetadataDS.col("path").equalTo(validFileMetadataDS.col("path"));
+
+ List<String> orphanFiles =
actualFileMetadataDS.joinWith(validFileMetadataDS,
+ joinCond, "leftouter")
Review Comment:
nit: this can fit on one line
```
List<String> orphanFiles =
actualFileMetadataDS.joinWith(validFileMetadataDS, joinCond, "leftouter")
...
```
##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java:
##########
@@ -361,4 +471,53 @@ static PathFilter forSpecs(Map<Integer, PartitionSpec>
specs) {
return partitionNames.isEmpty() ? HiddenPathFilter.get() : new
PartitionAwareHiddenPathFilter(partitionNames);
}
}
+
+ public static class FileMetadata implements Serializable {
+ private String scheme;
+ private String authority;
+ private String path;
+ private String filePath;
+
+ public FileMetadata(String scheme, String authority, String path, String
filePathAsString) {
Review Comment:
`filePathAsString ` -> `location`
##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java:
##########
@@ -361,4 +471,53 @@ static PathFilter forSpecs(Map<Integer, PartitionSpec>
specs) {
return partitionNames.isEmpty() ? HiddenPathFilter.get() : new
PartitionAwareHiddenPathFilter(partitionNames);
}
}
+
+ public static class FileMetadata implements Serializable {
+ private String scheme;
+ private String authority;
+ private String path;
+ private String filePath;
+
+ public FileMetadata(String scheme, String authority, String path, String
filePathAsString) {
+ this.scheme = scheme;
+ this.authority = authority;
+ this.path = path;
+ this.filePath = filePathAsString;
+ }
+
+ public FileMetadata() {
+ }
+
+ public void setScheme(String scheme) {
+ this.scheme = scheme;
+ }
+
+ public void setAuthority(String authority) {
+ this.authority = authority;
+ }
+
+ public void setPath(String path) {
+ this.path = path;
+ }
+
+ public void setFilePath(String filePath) {
+ this.filePath = filePath;
+ }
+
+ public String getScheme() {
+ return scheme;
+ }
+
+ public String getAuthority() {
+ return authority;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public String getFilePath() {
Review Comment:
Getters/setters will have to be renamed as well.
##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java:
##########
@@ -51,7 +55,10 @@ public class RemoveOrphanFilesProcedure extends
BaseProcedure {
ProcedureParameter.optional("location", DataTypes.StringType),
ProcedureParameter.optional("dry_run", DataTypes.BooleanType),
ProcedureParameter.optional("max_concurrent_deletes",
DataTypes.IntegerType),
- ProcedureParameter.optional("file_list_view", DataTypes.StringType)
+ ProcedureParameter.optional("file_list_view", DataTypes.StringType),
+ ProcedureParameter.optional("equal_schemes", STRING_MAP),
+ ProcedureParameter.optional("equal_authorities", STRING_MAP),
+ ProcedureParameter.optional("prefix_mismatch_mode", STRING_MAP),
Review Comment:
I don't think it should be `STRING_MAP`. I think it should be
`DataTypes.StringType`. Can we also add tests to make sure this is tested?
##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java:
##########
@@ -143,6 +148,27 @@ public DeleteOrphanFilesSparkAction
executeDeleteWith(ExecutorService executorSe
return this;
}
+ @Override
+ public DeleteOrphanFiles newPrefixMismatchMode(PrefixMismatchMode
mismatchMode) {
+ this.prefixMismatchMode = mismatchMode;
+ return this;
+ }
+
+ @Override
+ public DeleteOrphanFiles newEqualSchemes(Map<String, String> schemes) {
Review Comment:
We should return `DeleteOrphanFilesSparkAction` in all new methods in the
implementation class.
##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java:
##########
@@ -93,6 +100,26 @@ public InternalRow[] call(InternalRow args) {
Preconditions.checkArgument(maxConcurrentDeletes == null ||
maxConcurrentDeletes > 0,
"max_concurrent_deletes should have value > 0, value: " +
maxConcurrentDeletes);
+ Map<String, String> equalSchemes = Maps.newHashMap();
Review Comment:
nit: let's add an empty line before these blocks
##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java:
##########
@@ -93,6 +100,26 @@ public InternalRow[] call(InternalRow args) {
Preconditions.checkArgument(maxConcurrentDeletes == null ||
maxConcurrentDeletes > 0,
"max_concurrent_deletes should have value > 0, value: " +
maxConcurrentDeletes);
+ Map<String, String> equalSchemes = Maps.newHashMap();
+ if (!args.isNullAt(6)) {
+ args.getMap(6).foreach(DataTypes.StringType, DataTypes.StringType,
+ (k, v) -> {
+ equalSchemes.put(k.toString(), v.toString());
+ return BoxedUnit.UNIT;
+ });
+ }
+
+ Map<String, String> equalAuthorities = Maps.newHashMap();
+ if (!args.isNullAt(7)) {
+ args.getMap(7).foreach(DataTypes.StringType, DataTypes.StringType,
+ (k, v) -> {
+ equalSchemes.put(k.toString(), v.toString());
+ return BoxedUnit.UNIT;
+ });
+ }
+
+ String prefixMismatchMode = args.isNullAt(8) ?
+ DeleteOrphanFiles.PrefixMismatchMode.ERROR.toString() :
args.getString(8);
Review Comment:
I don't think we should default it in the procedure. If it is null, just
don't set it (see below).
##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java:
##########
@@ -326,6 +346,96 @@ private static FlatMapFunction<Iterator<String>, String>
listDirsRecursively(
};
}
+ @VisibleForTesting
+ static List<String> findOrphanFiles(
+ SparkSession spark, Dataset<Row> actualFileDF, Dataset<Row>
validFileDF,
Review Comment:
nit: this also uses 8 spaces instead of 4
##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java:
##########
@@ -955,4 +956,114 @@ protected long waitUntilAfter(long timestampMillis) {
}
return current;
}
+
+ @Test
+ public void testPathsWithExtraSlashes() {
+ List<String> validFiles = Lists.newArrayList("file:///dir1/dir2/file1");
+ List<String> actualFiles =
Lists.newArrayList("file:///dir1/////dir2///file1");
+ executeTest(validFiles, actualFiles, Lists.newArrayList());
+ }
+
+ @Test
+ public void testPathsWithValidFileHavingNoAuthority() {
+ List<String> validFiles = Lists.newArrayList("hdfs:///dir1/dir2/file1");
+ List<String> actualFiles =
Lists.newArrayList("hdfs://servicename/dir1/dir2/file1");
+ executeTest(validFiles, actualFiles, Lists.newArrayList());
+ }
+
+ @Test
+ public void testPathsWithActualFileHavingNoAuthority() {
+ List<String> validFiles =
Lists.newArrayList("hdfs://servicename/dir1/dir2/file1");
+ List<String> actualFiles = Lists.newArrayList("hdfs:///dir1/dir2/file1");
+ executeTest(validFiles, actualFiles, Lists.newArrayList());
+ }
+
+ @Test
+ public void testPathsWithEqualSchemes() {
+ List<String> validFiles =
Lists.newArrayList("s3n://bucket1/dir1/dir2/file1");
Review Comment:
I feel like we should handle `s3n/s3a/s3` by default since they are obvious.
##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java:
##########
@@ -120,6 +147,13 @@ public InternalRow[] call(InternalRow args) {
if (fileListView != null) {
action.compareToFileList(spark().table(fileListView));
}
+ Preconditions.checkArgument(prefixMismatchMode == null ||
Review Comment:
I feel like this can be simplified a bit.
```
PrefixMismatchMode prefixMismatchMode = args.isNullAt(8) ? null :
PrefixMismatchMode.fromString(args.getString(8));
```
Then we can call it like this:
```
if (prefixMismatchMode != null) {
action.prefixMismatchMode(prefixMismatchMode);
}
```
You will need to add a method to `PrefixMismatchMode` enum.
```
public static PrefixMismatchMode fromString(String modeAsString) {
Preconditions.checkArgument(modeAsString != null, "Mode should not be
null");
return
PrefixMismatchMode.valueOf(modeAsString.toUpperCase(Locale.ENGLISH));
}
```
##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java:
##########
@@ -955,4 +956,114 @@ protected long waitUntilAfter(long timestampMillis) {
}
return current;
}
+
+ @Test
+ public void testPathsWithExtraSlashes() {
+ List<String> validFiles = Lists.newArrayList("file:///dir1/dir2/file1");
+ List<String> actualFiles =
Lists.newArrayList("file:///dir1/////dir2///file1");
+ executeTest(validFiles, actualFiles, Lists.newArrayList());
+ }
+
+ @Test
+ public void testPathsWithValidFileHavingNoAuthority() {
+ List<String> validFiles = Lists.newArrayList("hdfs:///dir1/dir2/file1");
+ List<String> actualFiles =
Lists.newArrayList("hdfs://servicename/dir1/dir2/file1");
+ executeTest(validFiles, actualFiles, Lists.newArrayList());
+ }
+
+ @Test
+ public void testPathsWithActualFileHavingNoAuthority() {
+ List<String> validFiles =
Lists.newArrayList("hdfs://servicename/dir1/dir2/file1");
+ List<String> actualFiles = Lists.newArrayList("hdfs:///dir1/dir2/file1");
+ executeTest(validFiles, actualFiles, Lists.newArrayList());
+ }
+
+ @Test
+ public void testPathsWithEqualSchemes() {
+ List<String> validFiles =
Lists.newArrayList("s3n://bucket1/dir1/dir2/file1");
+ List<String> actualFiles =
Lists.newArrayList("s3a://bucket1/dir1/dir2/file1");
+ AssertHelpers.assertThrows("Test remove orphan files with equal schemes",
+ ValidationException.class,
+ "Conflicting authorities/schemes: [(s3n, s3a)]",
+ () -> executeTest(validFiles,
+ actualFiles,
+ Lists.newArrayList(),
+ ImmutableMap.of(),
+ ImmutableMap.of(),
+ DeleteOrphanFiles.PrefixMismatchMode.ERROR));
+
+ Map<String, String> equalSchemes = Maps.newHashMap();
+ equalSchemes.put("s3n", "s3");
+ equalSchemes.put("s3a", "s3");
+ executeTest(validFiles,
+ actualFiles,
+ Lists.newArrayList(),
+ equalSchemes,
+ ImmutableMap.of(),
+ DeleteOrphanFiles.PrefixMismatchMode.ERROR);
+ }
+
+ @Test
+ public void testPathsWithEqualAuthorities() {
+ List<String> validFiles =
Lists.newArrayList("hdfs://servicename1/dir1/dir2/file1");
+ List<String> actualFiles =
Lists.newArrayList("hdfs://servicename2/dir1/dir2/file1");
+ AssertHelpers.assertThrows("Test remove orphan files with equal
authorities",
+ ValidationException.class,
+ "Conflicting authorities/schemes: [(servicename1, servicename2)]",
+ () -> executeTest(validFiles,
+ actualFiles,
+ Lists.newArrayList(),
+ ImmutableMap.of(),
+ ImmutableMap.of(),
+ DeleteOrphanFiles.PrefixMismatchMode.ERROR));
+
+ Map<String, String> equalAuthorities = Maps.newHashMap();
+ equalAuthorities.put("servicename1", "servicename");
+ equalAuthorities.put("servicename2", "servicename");
+ executeTest(validFiles,
+ actualFiles,
+ Lists.newArrayList(),
+ ImmutableMap.of(),
+ equalAuthorities,
+ DeleteOrphanFiles.PrefixMismatchMode.ERROR);
+ }
+
+ @Test
+ public void testRemoveOrphanFileActionWithDeleteMode() {
+ List<String> validFiles =
Lists.newArrayList("hdfs://servicename1/dir1/dir2/file1");
+ List<String> actualFiles =
Lists.newArrayList("hdfs://servicename2/dir1/dir2/file1");
+
+ executeTest(validFiles,
+ actualFiles,
+ Lists.newArrayList("hdfs://servicename2/dir1/dir2/file1"),
+ ImmutableMap.of(),
+ ImmutableMap.of(),
+ DeleteOrphanFiles.PrefixMismatchMode.DELETE);
+ }
+
+ private void executeTest(List<String> validFiles,
+ List<String> actualFiles,
+ List<String> expectedOrphanFiles) {
+ executeTest(validFiles, actualFiles, expectedOrphanFiles,
ImmutableMap.of(), ImmutableMap.of(),
+ DeleteOrphanFiles.PrefixMismatchMode.IGNORE);
+ }
+
+ private void executeTest(List<String> validFiles,
+ List<String> actualFiles,
+ List<String> expectedOrphanFiles,
+ Map<String, String> equalSchemes,
+ Map<String, String> equalAuthorities,
+ DeleteOrphanFiles.PrefixMismatchMode mode) {
+ Dataset<Row> validFilesDF = spark.createDataset(validFiles,
Encoders.STRING()).toDF();
+ Dataset<Row> actualFilesDF = spark.createDataset(actualFiles,
Encoders.STRING()).toDF();
+
+ List<String> orphanFiles =
BaseDeleteOrphanFilesSparkAction.findOrphanFiles(
Review Comment:
nit: We can use `DeleteOrphanFilesSparkAction` now
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]