aokolnychyi commented on code in PR #4652:
URL: https://github.com/apache/iceberg/pull/4652#discussion_r930431757
##########
spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java:
##########
@@ -400,4 +409,74 @@ public void testRemoveOrphanFilesWithDeleteFiles() throws
Exception {
.collectAsList();
Assert.assertEquals("Rows must match", records, actualRecords);
}
+
+ @Test
+ public void testRemoveOrphanFilesProcedureWithPrefixMode() throws
NoSuchTableException, ParseException, IOException {
+ if (catalogName.equals("testhadoop")) {
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg",
tableName);
+ } else {
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg
LOCATION '%s'", tableName,
+ temp.newFolder().toURI().toString());
+ }
+ Table table = Spark3Util.loadIcebergTable(spark, tableName);
+ String location = table.location();
+ Path originalPath = new Path(location);
+
+ URI uri = originalPath.toUri();
+ Path newParentPath = new Path("file1", uri.getAuthority(), uri.getPath());
+
+ DataFile dataFile1 = DataFiles.builder(PartitionSpec.unpartitioned())
+ .withPath(new Path(newParentPath,
"path/to/data-a.parquet").toString())
Review Comment:
nit: I feel the code in the added test is formatted with 4/8 spaces instead
of 2/4
##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java:
##########
@@ -94,25 +102,21 @@
extends BaseSparkAction<DeleteOrphanFilesSparkAction> implements
DeleteOrphanFiles {
private static final Logger LOG =
LoggerFactory.getLogger(DeleteOrphanFilesSparkAction.class);
- private static final UserDefinedFunction filenameUDF = functions.udf((String
path) -> {
- int lastIndex = path.lastIndexOf(File.separator);
- if (lastIndex == -1) {
- return path;
- } else {
- return path.substring(lastIndex + 1);
- }
- }, DataTypes.StringType);
-
private final SerializableConfiguration hadoopConf;
private final int partitionDiscoveryParallelism;
private final Table table;
+ private static final Splitter COMMA = Splitter.on(",");
+ private static final Map<String, String> EQUAL_SCHEMES_DEFAULT =
ImmutableMap.of("s3n,s3a", "s3");
+
private final Consumer<String> defaultDelete = new Consumer<String>() {
@Override
public void accept(String file) {
table.io().deleteFile(file);
}
};
-
+ private Map<String, String> equalSchemes = Collections.emptyMap();
Review Comment:
Should it be `flattenMap(EQUAL_SCHEMES_DEFAULT)` if the user does not set
equal schemes?
##########
api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java:
##########
@@ -80,6 +81,53 @@ public interface DeleteOrphanFiles extends
Action<DeleteOrphanFiles, DeleteOrpha
*/
DeleteOrphanFiles executeDeleteWith(ExecutorService executorService);
+ /**
+ * Passes a prefix mismatch mode that determines how this action should
handle situations when
+ * the metadata references files that match listed/provided files except for
authority/scheme.
+ * <p>
+ * Possible values are "ERROR", "IGNORE", "DELETE". The default mismatch
mode is "ERROR",
+ * which means an exception is thrown whenever there is a mismatch in
authority/scheme.
+ * It's the recommended mismatch mode and should be changed only in some
rare circumstances.
+ * If there is a mismatch, use {@link #newEqualSchemes(Map)} (Map)} and
{@link #newEqualAuthorities(Map)} (Map)}
Review Comment:
Looks like this was missed.
##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java:
##########
@@ -121,6 +148,12 @@ public InternalRow[] call(InternalRow args) {
action.compareToFileList(spark().table(fileListView));
}
+ action.equalSchemes(equalSchemes);
+ action.equalAuthorities(equalAuthorities);
+ if (prefixMismatchMode != null) {
Review Comment:
nit: let's add an empty line before the if statement to match all other if
statements above
##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java:
##########
@@ -94,18 +101,15 @@
extends BaseSparkAction<DeleteOrphanFilesSparkAction> implements
DeleteOrphanFiles {
private static final Logger LOG =
LoggerFactory.getLogger(DeleteOrphanFilesSparkAction.class);
- private static final UserDefinedFunction filenameUDF = functions.udf((String
path) -> {
- int lastIndex = path.lastIndexOf(File.separator);
- if (lastIndex == -1) {
- return path;
- } else {
- return path.substring(lastIndex + 1);
- }
- }, DataTypes.StringType);
-
private final SerializableConfiguration hadoopConf;
private final int partitionDiscoveryParallelism;
private final Table table;
+ private static final Splitter COMMA = Splitter.on(",");
+ private static final Map<String, String> EQUAL_SCHEMES_DEFAULT =
ImmutableMap.of("s3n,s3a", "s3");
+
+ private Map<String, String> equalSchemes = ImmutableMap.of();
Review Comment:
What if the user does not set equal schemes?
##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java:
##########
@@ -120,6 +147,13 @@ public InternalRow[] call(InternalRow args) {
if (fileListView != null) {
action.compareToFileList(spark().table(fileListView));
}
+ Preconditions.checkArgument(prefixMismatchMode == null ||
Review Comment:
This will force the user to match the upper case expected by `valueOf`. The
solution above is a bit more flexible. We do that in many other enums.
##########
api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java:
##########
@@ -80,6 +81,53 @@ public interface DeleteOrphanFiles extends
Action<DeleteOrphanFiles, DeleteOrpha
*/
DeleteOrphanFiles executeDeleteWith(ExecutorService executorService);
+ /**
+ * Passes a prefix mismatch mode that determines how this action should
handle situations when
+ * the metadata references files that match listed/provided files except for
authority/scheme.
+ * <p>
+ * Possible values are "ERROR", "IGNORE", "DELETE". The default mismatch
mode is "ERROR",
+ * which means an exception is thrown whenever there is a mismatch in
authority/scheme.
+ * It's the recommended mismatch mode and should be changed only in some
rare circumstances.
+ * If there is a mismatch, use {@link #equalSchemes(Map)} (Map)} and {@link
#equalAuthorities(Map)} (Map)}
Review Comment:
nit: broken Javadoc
##########
spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java:
##########
@@ -400,4 +409,74 @@ public void testRemoveOrphanFilesWithDeleteFiles() throws
Exception {
.collectAsList();
Assert.assertEquals("Rows must match", records, actualRecords);
}
+
+ @Test
+ public void testRemoveOrphanFilesProcedureWithPrefixMode() throws
NoSuchTableException, ParseException, IOException {
+ if (catalogName.equals("testhadoop")) {
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg",
tableName);
+ } else {
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg
LOCATION '%s'", tableName,
+ temp.newFolder().toURI().toString());
+ }
+ Table table = Spark3Util.loadIcebergTable(spark, tableName);
+ String location = table.location();
+ Path originalPath = new Path(location);
+
+ URI uri = originalPath.toUri();
+ Path newParentPath = new Path("file1", uri.getAuthority(), uri.getPath());
+
+ DataFile dataFile1 = DataFiles.builder(PartitionSpec.unpartitioned())
+ .withPath(new Path(newParentPath,
"path/to/data-a.parquet").toString())
+ .withFileSizeInBytes(10)
+ .withRecordCount(1)
+ .build();
+ DataFile dataFile2 = DataFiles.builder(PartitionSpec.unpartitioned())
+ .withPath(new Path(newParentPath,
"path/to/data-b.parquet").toString())
+ .withFileSizeInBytes(10)
+ .withRecordCount(1)
+ .build();
+
+ table.newFastAppend()
+ .appendFile(dataFile1)
+ .appendFile(dataFile2)
+ .commit();
+
+ List<FilePathLastModifiedRecord> allFiles = Lists.newArrayList(
Review Comment:
It is a little bit hard to read. I think you can simplify it by using for
loops and slightly different formatting.
```
List<FilePathLastModifiedRecord> allFiles = Lists.newArrayList(
new FilePathLastModifiedRecord(
new Path(originalPath, "path/to/data-a.parquet").toString(),
LAST_MODIFIED_TIMESTAMP),
new FilePathLastModifiedRecord(
new Path(originalPath, "path/to/data-b.parquet").toString(),
LAST_MODIFIED_TIMESTAMP),
new FilePathLastModifiedRecord(
ReachableFileUtil.versionHintLocation(table),
LAST_MODIFIED_TIMESTAMP));
for (String file : ReachableFileUtil.metadataFileLocations(table, true)) {
allFiles.add(new FilePathLastModifiedRecord(file,
LAST_MODIFIED_TIMESTAMP));
}
for (ManifestFile manifest : TestHelpers.dataManifests(table)) {
allFiles.add(new FilePathLastModifiedRecord(manifest.path(),
LAST_MODIFIED_TIMESTAMP));
}
```
##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java:
##########
@@ -326,6 +345,99 @@ private static FlatMapFunction<Iterator<String>, String>
listDirsRecursively(
};
}
+ @VisibleForTesting
+ static List<String> findOrphanFiles(
+ SparkSession spark, Dataset<Row> actualFileDF, Dataset<Row> validFileDF,
+ Map<String, String> equalSchemes, Map<String, String> equalAuthorities,
+ PrefixMismatchMode prefixMismatchMode) {
+ Dataset<FileMetadata> actualFileMetadataDS = actualFileDF.mapPartitions(
+ toFileMetadata(equalSchemes, equalAuthorities),
+ Encoders.bean(FileMetadata.class));
+ Dataset<FileMetadata> validFileMetadataDS = validFileDF.mapPartitions(
+ toFileMetadata(equalSchemes, equalAuthorities),
+ Encoders.bean(FileMetadata.class));
+
+ SetAccumulator<Pair<String, String>> conflicts = new SetAccumulator<>();
+ spark.sparkContext().register(conflicts);
+
+ Column joinCond =
actualFileMetadataDS.col("path").equalTo(validFileMetadataDS.col("path"));
+
+ List<String> orphanFiles =
actualFileMetadataDS.joinWith(validFileMetadataDS, joinCond, "leftouter")
+ .mapPartitions(findOrphanFiles(prefixMismatchMode, conflicts),
Encoders.STRING())
+ .collectAsList();
+
+ if (prefixMismatchMode == PrefixMismatchMode.ERROR &&
!conflicts.value().isEmpty()) {
+ throw new ValidationException("Unable to determine whether certain files
are orphan. " +
+ "Metadata references files that match listed/provided files except
for authority/scheme. " +
+ "Please, inspect the conflicting authorities/schemes and provide
which of them are equal " +
+ "by further configuring the action via equalSchemes() and
equalAuthorities() methods. " +
+ "Set the prefix mismatch mode to 'NONE' to ignore remaining
locations with conflicting " +
+ "authorities/schemes or to 'DELETE' iff you are ABSOLUTELY confident
that remaining conflicting " +
+ "authorities/schemes are different. It will be impossible to recover
deleted files. " +
+ "Conflicting authorities/schemes: %s.", conflicts.value());
+ }
+ return orphanFiles;
+ }
+
+ private static Map<String, String> flattenMap(Map<String, String> map) {
+ Map<String, String> flattenedMap = Maps.newHashMap();
+ if (map != null) {
+ for (String key : map.keySet()) {
+ String value = map.get(key);
+ for (String splitKey : COMMA.split(key)) {
+ flattenedMap.put(splitKey.trim(), value.trim());
+ }
+ }
+ }
+ return flattenedMap;
+ }
+
+ private static MapPartitionsFunction<Tuple2<FileMetadata, FileMetadata>,
String> findOrphanFiles(
+ PrefixMismatchMode mode,
+ SetAccumulator<Pair<String, String>> conflicts) {
+ return rows -> {
+ Iterator<String> transformed = Iterators.transform(rows, row -> {
+ FileMetadata actual = row._1;
+ FileMetadata valid = row._2;
+
+ if (valid == null) {
+ return actual.location;
+ }
+
+ boolean schemeMatch = Strings.isNullOrEmpty(valid.scheme) ||
+ valid.scheme.equalsIgnoreCase(actual.scheme);
+ boolean authorityMatch = Strings.isNullOrEmpty(valid.authority) ||
+ valid.authority.equalsIgnoreCase(actual.authority);
+
+ if ((!schemeMatch || !authorityMatch) && mode ==
PrefixMismatchMode.DELETE) {
+ return actual.location;
+ } else {
+ if (!schemeMatch) {
+ conflicts.add(Pair.of(valid.scheme, actual.scheme));
+ }
+ if (!authorityMatch) {
+ conflicts.add(Pair.of(valid.authority, actual.authority));
+ }
+ }
+
+ return null;
+ });
+ return Iterators.filter(transformed, Objects::nonNull);
+ };
+ }
+
+ private static MapPartitionsFunction<Row, FileMetadata> toFileMetadata(
+ Map<String, String> equalSchemesMap, Map<String, String>
equalAuthoritiesMap) {
+ return rows -> Iterators.transform(rows, row -> {
+ String location = row.getString(0);
+ URI uri = new Path(location).toUri();
+ String scheme = equalSchemesMap.getOrDefault(uri.getScheme(),
uri.getScheme());
+ String authority = equalAuthoritiesMap.getOrDefault(uri.getAuthority(),
uri.getAuthority());
+ return new FileMetadata(scheme, authority, uri.getPath(), location);
+ });
+ }
+
+
Review Comment:
nit: extra empty line
##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java:
##########
@@ -326,6 +345,99 @@ private static FlatMapFunction<Iterator<String>, String>
listDirsRecursively(
};
}
+ @VisibleForTesting
+ static List<String> findOrphanFiles(
+ SparkSession spark, Dataset<Row> actualFileDF, Dataset<Row> validFileDF,
+ Map<String, String> equalSchemes, Map<String, String> equalAuthorities,
+ PrefixMismatchMode prefixMismatchMode) {
+ Dataset<FileMetadata> actualFileMetadataDS = actualFileDF.mapPartitions(
+ toFileMetadata(equalSchemes, equalAuthorities),
+ Encoders.bean(FileMetadata.class));
+ Dataset<FileMetadata> validFileMetadataDS = validFileDF.mapPartitions(
+ toFileMetadata(equalSchemes, equalAuthorities),
+ Encoders.bean(FileMetadata.class));
+
+ SetAccumulator<Pair<String, String>> conflicts = new SetAccumulator<>();
+ spark.sparkContext().register(conflicts);
+
+ Column joinCond =
actualFileMetadataDS.col("path").equalTo(validFileMetadataDS.col("path"));
+
+ List<String> orphanFiles =
actualFileMetadataDS.joinWith(validFileMetadataDS, joinCond, "leftouter")
+ .mapPartitions(findOrphanFiles(prefixMismatchMode, conflicts),
Encoders.STRING())
+ .collectAsList();
+
+ if (prefixMismatchMode == PrefixMismatchMode.ERROR &&
!conflicts.value().isEmpty()) {
+ throw new ValidationException("Unable to determine whether certain files
are orphan. " +
+ "Metadata references files that match listed/provided files except
for authority/scheme. " +
+ "Please, inspect the conflicting authorities/schemes and provide
which of them are equal " +
+ "by further configuring the action via equalSchemes() and
equalAuthorities() methods. " +
+ "Set the prefix mismatch mode to 'NONE' to ignore remaining
locations with conflicting " +
+ "authorities/schemes or to 'DELETE' iff you are ABSOLUTELY confident
that remaining conflicting " +
+ "authorities/schemes are different. It will be impossible to recover
deleted files. " +
+ "Conflicting authorities/schemes: %s.", conflicts.value());
+ }
+ return orphanFiles;
Review Comment:
nit: let's add an empty line before the return statement to separate the if
block
##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java:
##########
@@ -94,6 +100,27 @@ public InternalRow[] call(InternalRow args) {
Preconditions.checkArgument(maxConcurrentDeletes == null ||
maxConcurrentDeletes > 0,
"max_concurrent_deletes should have value > 0, value: " +
maxConcurrentDeletes);
+ Map<String, String> equalSchemes = Maps.newHashMap();
+ if (!args.isNullAt(6)) {
+ args.getMap(6).foreach(DataTypes.StringType, DataTypes.StringType,
+ (k, v) -> {
+ equalSchemes.put(k.toString(), v.toString());
+ return BoxedUnit.UNIT;
+ });
+ }
+
+ Map<String, String> equalAuthorities = Maps.newHashMap();
+ if (!args.isNullAt(7)) {
+ args.getMap(7).foreach(DataTypes.StringType, DataTypes.StringType,
+ (k, v) -> {
+ equalSchemes.put(k.toString(), v.toString());
+ return BoxedUnit.UNIT;
+ });
+ }
+
+ DeleteOrphanFiles.PrefixMismatchMode prefixMismatchMode = args.isNullAt(8)
? null :
Review Comment:
nit: I'd have a direct import for `PrefixMismatchMode` to shorten the line
and `fromString` to the enum like discussed
[here](https://github.com/apache/iceberg/pull/4652/#discussion_r927171675).
##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java:
##########
@@ -94,18 +101,15 @@
extends BaseSparkAction<DeleteOrphanFilesSparkAction> implements
DeleteOrphanFiles {
private static final Logger LOG =
LoggerFactory.getLogger(DeleteOrphanFilesSparkAction.class);
- private static final UserDefinedFunction filenameUDF = functions.udf((String
path) -> {
- int lastIndex = path.lastIndexOf(File.separator);
- if (lastIndex == -1) {
- return path;
- } else {
- return path.substring(lastIndex + 1);
- }
- }, DataTypes.StringType);
-
private final SerializableConfiguration hadoopConf;
private final int partitionDiscoveryParallelism;
private final Table table;
+ private static final Splitter COMMA = Splitter.on(",");
Review Comment:
I think this still remains open as we have LOG (a static var) followed by
final instance vars and then static vars again. We should group `LOG`, `COMMA`,
`EQUAL_SCHEMES_DEFAULT` and then add an empty like before final instance vars
like in the formatting above.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]